You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by le...@apache.org on 2022/06/27 09:25:09 UTC
[flink] branch release-1.15 updated: [FLINK-27794][connectors/jdbc] Fix the wrong primary key for MysqlCatalog when there are tables with same name in different databases
This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new 1d0dac0e007 [FLINK-27794][connectors/jdbc] Fix the wrong primary key for MysqlCatalog when there are tables with same name in different databases
1d0dac0e007 is described below
commit 1d0dac0e00739a4cf176cba6dc091662e48170c3
Author: dusukang <68...@users.noreply.github.com>
AuthorDate: Mon Jun 27 17:25:02 2022 +0800
[FLINK-27794][connectors/jdbc] Fix the wrong primary key for MysqlCatalog when there are tables with same name in different databases
This closes #19741.
---
.../jdbc/catalog/AbstractJdbcCatalog.java | 21 ++++++--
.../flink/connector/jdbc/catalog/MySqlCatalog.java | 2 +-
.../connector/jdbc/catalog/MySqlCatalogITCase.java | 49 ++++++++++++++++--
.../jdbc/catalog/MySqlCatalogTestBase.java | 58 +++++++++++++++-------
.../mysql-scripts/catalog-init-for-test.sql | 32 ++++++++++--
5 files changed, 129 insertions(+), 33 deletions(-)
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
index a2e46ff0573..82410850420 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
@@ -144,12 +144,17 @@ public abstract class AbstractJdbcCatalog extends AbstractCatalog {
// ------ retrieve PK constraint ------
protected Optional<UniqueConstraint> getPrimaryKey(
- DatabaseMetaData metaData, String schema, String table) throws SQLException {
+ DatabaseMetaData metaData, String database, String schema, String table)
+ throws SQLException {
// According to the Javadoc of java.sql.DatabaseMetaData#getPrimaryKeys,
// the returned primary key columns are ordered by COLUMN_NAME, not by KEY_SEQ.
// We need to sort them based on the KEY_SEQ value.
- ResultSet rs = metaData.getPrimaryKeys(null, schema, table);
+ // In the currently supported database dialects MySQL and Postgres,
+ // the database term is equivalent to catalog term.
+ // We need to pass the database name as catalog parameter for retrieving primary keys by
+ // full table identifier.
+ ResultSet rs = metaData.getPrimaryKeys(database, schema, table);
Map<Integer, String> keySeqColumnName = new HashMap<>();
String pkName = null;
@@ -157,6 +162,9 @@ public abstract class AbstractJdbcCatalog extends AbstractCatalog {
String columnName = rs.getString("COLUMN_NAME");
pkName = rs.getString("PK_NAME"); // all the PK_NAME should be the same
int keySeq = rs.getInt("KEY_SEQ");
+ Preconditions.checkState(
+ !keySeqColumnName.containsKey(keySeq - 1),
+ "The field(s) of primary key must be from the same table.");
keySeqColumnName.put(keySeq - 1, columnName); // KEY_SEQ is 1-based index
}
List<String> pkFields =
@@ -228,12 +236,17 @@ public abstract class AbstractJdbcCatalog extends AbstractCatalog {
throw new TableNotExistException(getName(), tablePath);
}
- String dbUrl = baseUrl + tablePath.getDatabaseName();
+ String databaseName = tablePath.getDatabaseName();
+ String dbUrl = baseUrl + databaseName;
try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) {
DatabaseMetaData metaData = conn.getMetaData();
Optional<UniqueConstraint> primaryKey =
- getPrimaryKey(metaData, getSchemaName(tablePath), getTableName(tablePath));
+ getPrimaryKey(
+ metaData,
+ databaseName,
+ getSchemaName(tablePath),
+ getTableName(tablePath));
PreparedStatement ps =
conn.prepareStatement(
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalog.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalog.java
index 6e21f818860..de3dfe200b4 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalog.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalog.java
@@ -152,7 +152,7 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
@Override
protected String getSchemaName(ObjectPath tablePath) {
- return null;
+ return tablePath.getDatabaseName();
}
@Override
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogITCase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogITCase.java
index 0e2c48656e3..7f56525ab45 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogITCase.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogITCase.java
@@ -18,7 +18,9 @@
package org.apache.flink.connector.jdbc.catalog;
+import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.ObjectPath;
@@ -32,6 +34,8 @@ import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.math.BigDecimal;
import java.sql.Date;
@@ -49,6 +53,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/** E2E test for {@link MySqlCatalog}. */
+@RunWith(Parameterized.class)
public class MySqlCatalogITCase extends MySqlCatalogTestBase {
private static final List<Row> ALL_TYPES_ROWS =
@@ -75,7 +80,6 @@ public class MySqlCatalogITCase extends MySqlCatalogTestBase {
1L,
-1,
1L,
- "{\"k1\": \"v1\"}",
null,
"col_longtext",
null,
@@ -123,7 +127,6 @@ public class MySqlCatalogITCase extends MySqlCatalogTestBase {
1L,
-1,
1L,
- "{\"k1\": \"v1\"}",
null,
"col_longtext",
null,
@@ -150,6 +153,7 @@ public class MySqlCatalogITCase extends MySqlCatalogTestBase {
Timestamp.valueOf("2021-08-04 01:53:19.098").toLocalDateTime(),
null));
+ private final MySqlCatalog catalog;
private TableEnvironment tEnv;
@Before
@@ -162,6 +166,15 @@ public class MySqlCatalogITCase extends MySqlCatalogTestBase {
tEnv.useCatalog(TEST_CATALOG_NAME);
}
+ public MySqlCatalogITCase(String version) {
+ catalog = CATALOGS.get(version);
+ }
+
+ @Parameterized.Parameters(name = "version = {0}")
+ public static String[] params() {
+ return DOCKER_IMAGE_NAMES.toArray(new String[0]);
+ }
+
// ------ databases ------
@Test
@@ -179,7 +192,7 @@ public class MySqlCatalogITCase extends MySqlCatalogTestBase {
@Test
public void testListDatabases() {
List<String> actual = catalog.listDatabases();
- assertEquals(Collections.singletonList(TEST_DB), actual);
+ assertEquals(Arrays.asList(TEST_DB, TEST_DB2), actual);
}
@Test
@@ -198,7 +211,8 @@ public class MySqlCatalogITCase extends MySqlCatalogTestBase {
Arrays.asList(
TEST_TABLE_ALL_TYPES,
TEST_SINK_TABLE_ALL_TYPES,
- TEST_TABLE_SINK_FROM_GROUPED_BY),
+ TEST_TABLE_SINK_FROM_GROUPED_BY,
+ TEST_TABLE_PK),
actual);
}
@@ -251,6 +265,33 @@ public class MySqlCatalogITCase extends MySqlCatalogTestBase {
assertEquals(TABLE_SCHEMA, table.getUnresolvedSchema());
}
+ @Test
+ public void testGetTablePrimaryKey() throws TableNotExistException {
+ // test the PK of test.t_user
+ Schema tableSchemaTestPK1 =
+ Schema.newBuilder()
+ .column("uid", DataTypes.BIGINT().notNull())
+ .column("col_bigint", DataTypes.BIGINT())
+ .primaryKeyNamed("PRIMARY", Collections.singletonList("uid"))
+ .build();
+ CatalogBaseTable tablePK1 = catalog.getTable(new ObjectPath(TEST_DB, TEST_TABLE_PK));
+ assertEquals(
+ tableSchemaTestPK1.getPrimaryKey().get(),
+ tablePK1.getUnresolvedSchema().getPrimaryKey().get());
+
+ // test the PK of test2.t_user
+ Schema tableSchemaTestPK2 =
+ Schema.newBuilder()
+ .column("pid", DataTypes.INT().notNull())
+ .column("col_varchar", DataTypes.VARCHAR(255))
+ .primaryKeyNamed("PRIMARY", Collections.singletonList("pid"))
+ .build();
+ CatalogBaseTable tablePK2 = catalog.getTable(new ObjectPath(TEST_DB2, TEST_TABLE_PK));
+ assertEquals(
+ tableSchemaTestPK2.getPrimaryKey().get(),
+ tablePK2.getUnresolvedSchema().getPrimaryKey().get());
+ }
+
// ------ test select query. ------
@Test
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogTestBase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogTestBase.java
index d2e01c6bc39..c06d0d6c266 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogTestBase.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogTestBase.java
@@ -23,15 +23,18 @@ import org.apache.flink.table.api.Schema;
import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+import org.junit.AfterClass;
import org.junit.BeforeClass;
-import org.junit.ClassRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerImageName;
+import java.sql.SQLException;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
/** Test base for {@link MySqlCatalog}. */
@@ -39,14 +42,17 @@ public class MySqlCatalogTestBase {
public static final Logger LOG = LoggerFactory.getLogger(MySqlCatalogTestBase.class);
- protected static final DockerImageName MYSQL_57_IMAGE = DockerImageName.parse("mysql:5.7.34");
+ protected static final List<String> DOCKER_IMAGE_NAMES =
+ Arrays.asList("mysql:5.6.51", "mysql:5.7.34", "mysql:8.0.16");
protected static final String TEST_CATALOG_NAME = "mysql_catalog";
protected static final String TEST_USERNAME = "mysql";
protected static final String TEST_PWD = "mysql";
protected static final String TEST_DB = "test";
+ protected static final String TEST_DB2 = "test2";
protected static final String TEST_TABLE_ALL_TYPES = "t_all_types";
protected static final String TEST_SINK_TABLE_ALL_TYPES = "t_all_types_sink";
protected static final String TEST_TABLE_SINK_FROM_GROUPED_BY = "t_grouped_by_sink";
+ protected static final String TEST_TABLE_PK = "t_pk";
protected static final String MYSQL_INIT_SCRIPT = "mysql-scripts/catalog-init-for-test.sql";
protected static final Map<String, String> DEFAULT_CONTAINER_ENV_MAP =
new HashMap<String, String>() {
@@ -77,7 +83,6 @@ public class MySqlCatalogTestBase {
.column("col_int_unsigned", DataTypes.BIGINT())
.column("col_integer", DataTypes.INT())
.column("col_integer_unsigned", DataTypes.BIGINT())
- .column("col_json", DataTypes.STRING())
.column("col_longblob", DataTypes.BYTES())
.column("col_longtext", DataTypes.STRING())
.column("col_mediumblob", DataTypes.BYTES())
@@ -106,23 +111,38 @@ public class MySqlCatalogTestBase {
.primaryKeyNamed("PRIMARY", Lists.newArrayList("pid"))
.build();
- @ClassRule
- public static final MySQLContainer<?> MYSQL_CONTAINER =
- new MySQLContainer<>(MYSQL_57_IMAGE)
- .withUsername("root")
- .withPassword("")
- .withEnv(DEFAULT_CONTAINER_ENV_MAP)
- .withInitScript(MYSQL_INIT_SCRIPT)
- .withLogConsumer(new Slf4jLogConsumer(LOG));
-
- protected static MySqlCatalog catalog;
+ public static final Map<String, MySQLContainer<?>> MYSQL_CONTAINERS = new HashMap<>();
+ public static final Map<String, MySqlCatalog> CATALOGS = new HashMap<>();
@BeforeClass
- public static void beforeAll() {
- String baseUrl =
- MYSQL_CONTAINER
- .getJdbcUrl()
- .substring(0, MYSQL_CONTAINER.getJdbcUrl().lastIndexOf("/"));
- catalog = new MySqlCatalog(TEST_CATALOG_NAME, TEST_DB, TEST_USERNAME, TEST_PWD, baseUrl);
+ public static void beforeAll() throws SQLException {
+ for (String dockerImageName : DOCKER_IMAGE_NAMES) {
+ MySQLContainer<?> container =
+ new MySQLContainer<>(DockerImageName.parse(dockerImageName))
+ .withUsername("root")
+ .withPassword("")
+ .withEnv(DEFAULT_CONTAINER_ENV_MAP)
+ .withInitScript(MYSQL_INIT_SCRIPT)
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+ container.start();
+ MYSQL_CONTAINERS.put(dockerImageName, container);
+ CATALOGS.put(
+ dockerImageName,
+ new MySqlCatalog(
+ TEST_CATALOG_NAME,
+ TEST_DB,
+ TEST_USERNAME,
+ TEST_PWD,
+ container
+ .getJdbcUrl()
+ .substring(0, container.getJdbcUrl().lastIndexOf("/"))));
+ }
+ }
+
+ @AfterClass
+ public static void cleanup() {
+ for (MySQLContainer<?> container : MYSQL_CONTAINERS.values()) {
+ container.stop();
+ }
}
}
diff --git a/flink-connectors/flink-connector-jdbc/src/test/resources/mysql-scripts/catalog-init-for-test.sql b/flink-connectors/flink-connector-jdbc/src/test/resources/mysql-scripts/catalog-init-for-test.sql
index a57dc9f4f3a..98f44b438dd 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/resources/mysql-scripts/catalog-init-for-test.sql
+++ b/flink-connectors/flink-connector-jdbc/src/test/resources/mysql-scripts/catalog-init-for-test.sql
@@ -17,8 +17,8 @@
*/
/**
-* The test for mysql 5.7.X & 8.0.X versions.
-* The init script contains some types that are incompatible with 5.6.X or lower versions.
+* The test for mysql 5.6.X & 5.7.X & 8.0.X versions.
+* The init script contains some types that are incompatible with lower versions.
*/
-- Creates test user info and grants privileges.
@@ -59,7 +59,6 @@ CREATE TABLE `t_all_types` (
`col_int_unsigned` int(10) unsigned DEFAULT NULL,
`col_integer` int(11) DEFAULT NULL,
`col_integer_unsigned` int(10) unsigned DEFAULT NULL,
- `col_json` json DEFAULT NULL,
`col_longblob` longblob,
`col_longtext` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_bin,
`col_mediumblob` mediumblob,
@@ -91,8 +90,8 @@ CREATE TABLE `t_all_types` (
-- ----------------------------
-- Records of t_all_types
-- ----------------------------
-INSERT INTO `t_all_types` VALUES (1, -1, 1, null, b'1', null, 'hello', '2021-08-04', '2021-08-04 01:54:16', -1, 1, -1, 1, 'enum2', -9.1, 9.1, -1, 1, -1, 1, '{\"k1\": \"v1\"}', null, 'col_longtext', null, -1, 1, 'col_mediumtext', -99, 99, -1, 1, 'set_ele1', -1, 1, 'col_text', '10:32:34', '2021-08-04 01:54:16', 'col_tinytext', -1, 1, null, 'col_varchar', '2021-08-04 01:54:16.463', '09:33:43.000', '2021-08-04 01:54:16.463', null);
-INSERT INTO `t_all_types` VALUES (2, -1, 1, null, b'1', null, 'hello', '2021-08-04', '2021-08-04 01:53:19', -1, 1, -1, 1, 'enum2', -9.1, 9.1, -1, 1, -1, 1, '{\"k1\": \"v1\"}', null, 'col_longtext', null, -1, 1, 'col_mediumtext', -99, 99, -1, 1, 'set_ele1,set_ele12', -1, 1, 'col_text', '10:32:34', '2021-08-04 01:53:19', 'col_tinytext', -1, 1, null, 'col_varchar', '2021-08-04 01:53:19.098', '09:33:43.000', '2021-08-04 01:53:19.098', null);
+INSERT INTO `t_all_types` VALUES (1, -1, 1, null, b'1', null, 'hello', '2021-08-04', '2021-08-04 01:54:16', -1, 1, -1, 1, 'enum2', -9.1, 9.1, -1, 1, -1, 1, null, 'col_longtext', null, -1, 1, 'col_mediumtext', -99, 99, -1, 1, 'set_ele1', -1, 1, 'col_text', '10:32:34', '2021-08-04 01:54:16', 'col_tinytext', -1, 1, null, 'col_varchar', '2021-08-04 01:54:16.463', '09:33:43.000', '2021-08-04 01:54:16.463', null);
+INSERT INTO `t_all_types` VALUES (2, -1, 1, null, b'1', null, 'hello', '2021-08-04', '2021-08-04 01:53:19', -1, 1, -1, 1, 'enum2', -9.1, 9.1, -1, 1, -1, 1, null, 'col_longtext', null, -1, 1, 'col_mediumtext', -99, 99, -1, 1, 'set_ele1,set_ele12', -1, 1, 'col_text', '10:32:34', '2021-08-04 01:53:19', 'col_tinytext', -1, 1, null, 'col_varchar', '2021-08-04 01:53:19.098', '09:33:43.000', '2021-08-04 01:53:19.098', null);
-- Create test table t_all_types_sink.
DROP TABLE IF EXISTS `t_all_types_sink`;
@@ -105,3 +104,26 @@ CREATE TABLE `t_grouped_by_sink` (
`col_bigint` bigint(20) DEFAULT NULL,
PRIMARY KEY (`pid`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
+
+-- Create test table t_pk.
+DROP TABLE IF EXISTS `t_pk`;
+CREATE TABLE `t_pk` (
+ `uid` bigint(20) NOT NULL AUTO_INCREMENT,
+ `col_bigint` bigint(20) DEFAULT NULL,
+ PRIMARY KEY (`uid`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;
+
+-- Create the `test2` database.
+DROP DATABASE IF EXISTS `test2`;
+CREATE DATABASE `test2` CHARSET=utf8;
+
+-- Uses `test2` database.
+use `test2`;
+
+-- Create test table t_pk.
+DROP TABLE IF EXISTS `t_pk`;
+CREATE TABLE `t_pk` (
+ `pid` int(11) NOT NULL AUTO_INCREMENT,
+ `col_varchar` varchar(255) DEFAULT NULL,
+ PRIMARY KEY (`pid`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;