You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/05/12 09:14:34 UTC
[incubator-paimon] 02/09: [flink][refactor] Extract caseSensitive related codes from MySqlSchema to util (#1123)
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
commit 18b43d5a7afe6021e24d20b42a89c8346ad65a4b
Author: yuzelin <33...@users.noreply.github.com>
AuthorDate: Thu May 11 11:17:07 2023 +0800
[flink][refactor] Extract caseSensitive related codes from MySqlSchema to util (#1123)
---
.../flink/action/cdc/mysql/MySqlActionUtils.java | 54 +++++++++++++-----
.../paimon/flink/action/cdc/mysql/MySqlSchema.java | 18 +-----
.../action/cdc/mysql/MySqlSyncDatabaseAction.java | 66 +++++++++++-----------
.../action/cdc/mysql/MySqlSyncTableAction.java | 20 +++----
4 files changed, 84 insertions(+), 74 deletions(-)
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
index 028b742c2..e63899e23 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.action.cdc.mysql;
import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
@@ -39,9 +40,11 @@ import org.apache.kafka.connect.json.JsonConverterConfig;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.stream.Collectors;
class MySqlActionUtils {
@@ -55,25 +58,25 @@ class MySqlActionUtils {
mySqlConfig.get(MySqlSourceOptions.PASSWORD));
}
- static void assertSchemaCompatible(TableSchema tableSchema, MySqlSchema mySqlSchema) {
- if (!schemaCompatible(tableSchema, mySqlSchema)) {
+ static void assertSchemaCompatible(TableSchema paimonSchema, Schema mySqlSchema) {
+ if (!schemaCompatible(paimonSchema, mySqlSchema)) {
throw new IllegalArgumentException(
"Paimon schema and MySQL schema are not compatible.\n"
+ "Paimon fields are: "
- + tableSchema.fields()
+ + paimonSchema.fields()
+ ".\nMySQL fields are: "
+ mySqlSchema.fields());
}
}
- static boolean schemaCompatible(TableSchema tableSchema, MySqlSchema mySqlSchema) {
- for (Map.Entry<String, Tuple2<DataType, String>> entry : mySqlSchema.fields().entrySet()) {
- int idx = tableSchema.fieldNames().indexOf(entry.getKey());
+ static boolean schemaCompatible(TableSchema paimonSchema, Schema mySqlSchema) {
+ for (DataField field : mySqlSchema.fields()) {
+ int idx = paimonSchema.fieldNames().indexOf(field.name());
if (idx < 0) {
return false;
}
- DataType type = tableSchema.fields().get(idx).type();
- if (UpdatedDataFieldsProcessFunction.canConvert(entry.getValue().f0, type)
+ DataType type = paimonSchema.fields().get(idx).type();
+ if (UpdatedDataFieldsProcessFunction.canConvert(field.type(), type)
!= UpdatedDataFieldsProcessFunction.ConvertAction.CONVERT) {
return false;
}
@@ -85,24 +88,49 @@ class MySqlActionUtils {
MySqlSchema mySqlSchema,
List<String> specifiedPartitionKeys,
List<String> specifiedPrimaryKeys,
- Map<String, String> paimonConfig) {
+ Map<String, String> paimonConfig,
+ boolean caseSensitive) {
Schema.Builder builder = Schema.newBuilder();
builder.options(paimonConfig);
- for (Map.Entry<String, Tuple2<DataType, String>> entry : mySqlSchema.fields().entrySet()) {
+ // build columns and primary keys from mySqlSchema
+ LinkedHashMap<String, Tuple2<DataType, String>> mySqlFields;
+ List<String> mySqlPrimaryKeys;
+ if (caseSensitive) {
+ mySqlFields = mySqlSchema.fields();
+ mySqlPrimaryKeys = mySqlSchema.primaryKeys();
+ } else {
+ mySqlFields = new LinkedHashMap<>();
+ for (Map.Entry<String, Tuple2<DataType, String>> entry :
+ mySqlSchema.fields().entrySet()) {
+ String fieldName = entry.getKey();
+ checkArgument(
+ !mySqlFields.containsKey(fieldName.toLowerCase()),
+ String.format(
+ "Duplicate key '%s' in table '%s.%s' appears when converting fields map keys to case-insensitive form.",
+ fieldName, mySqlSchema.databaseName(), mySqlSchema.tableName()));
+ mySqlFields.put(fieldName.toLowerCase(), entry.getValue());
+ }
+ mySqlPrimaryKeys =
+ mySqlSchema.primaryKeys().stream()
+ .map(String::toLowerCase)
+ .collect(Collectors.toList());
+ }
+
+ for (Map.Entry<String, Tuple2<DataType, String>> entry : mySqlFields.entrySet()) {
builder.column(entry.getKey(), entry.getValue().f0, entry.getValue().f1);
}
if (specifiedPrimaryKeys.size() > 0) {
for (String key : specifiedPrimaryKeys) {
- if (!mySqlSchema.fields().containsKey(key)) {
+ if (!mySqlFields.containsKey(key)) {
throw new IllegalArgumentException(
"Specified primary key " + key + " does not exist in MySQL tables");
}
}
builder.primaryKey(specifiedPrimaryKeys);
- } else if (mySqlSchema.primaryKeys().size() > 0) {
- builder.primaryKey(mySqlSchema.primaryKeys());
+ } else if (mySqlPrimaryKeys.size() > 0) {
+ builder.primaryKey(mySqlPrimaryKeys);
} else {
throw new IllegalArgumentException(
"Primary keys are not specified. "
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
index 2b390b81e..15a731487 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
@@ -30,8 +30,6 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
-
/** Utility class to load MySQL table schema with JDBC. */
public class MySqlSchema {
@@ -41,8 +39,7 @@ public class MySqlSchema {
private final LinkedHashMap<String, Tuple2<DataType, String>> fields;
private final List<String> primaryKeys;
- public MySqlSchema(
- DatabaseMetaData metaData, String databaseName, String tableName, boolean caseSensitive)
+ public MySqlSchema(DatabaseMetaData metaData, String databaseName, String tableName)
throws Exception {
this.databaseName = databaseName;
this.tableName = tableName;
@@ -62,14 +59,6 @@ public class MySqlSchema {
if (rs.wasNull()) {
scale = null;
}
- if (!caseSensitive) {
- checkArgument(
- !fields.containsKey(fieldName.toLowerCase()),
- String.format(
- "Duplicate key '%s' in table '%s.%s' appears when converting fields map keys to case-insensitive form.",
- fieldName, databaseName, tableName));
- fieldName = fieldName.toLowerCase();
- }
fields.put(
fieldName,
Tuple2.of(
@@ -82,9 +71,6 @@ public class MySqlSchema {
try (ResultSet rs = metaData.getPrimaryKeys(databaseName, null, tableName)) {
while (rs.next()) {
String fieldName = rs.getString("COLUMN_NAME");
- if (!caseSensitive) {
- fieldName = fieldName.toLowerCase();
- }
primaryKeys.add(fieldName);
}
}
@@ -98,7 +84,7 @@ public class MySqlSchema {
return tableName;
}
- public Map<String, Tuple2<DataType, String>> fields() {
+ public LinkedHashMap<String, Tuple2<DataType, String>> fields() {
return fields;
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index d39702383..de8785b3d 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -53,6 +53,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.function.Supplier;
import java.util.regex.Pattern;
import static org.apache.paimon.flink.action.Action.getConfigMap;
@@ -169,7 +170,7 @@ public class MySqlSyncDatabaseAction implements Action {
validateCaseInsensitive();
}
- List<MySqlSchema> mySqlSchemas = getMySqlSchemaList(caseSensitive);
+ List<MySqlSchema> mySqlSchemas = getMySqlSchemaList();
checkArgument(
mySqlSchemas.size() > 0,
"No tables found in MySQL database "
@@ -186,19 +187,22 @@ public class MySqlSyncDatabaseAction implements Action {
String paimonTableName = tableNameConverter.convert(mySqlSchema.tableName());
Identifier identifier = new Identifier(database, paimonTableName);
FileStoreTable table;
+ Schema fromMySql =
+ MySqlActionUtils.buildPaimonSchema(
+ mySqlSchema,
+ Collections.emptyList(),
+ Collections.emptyList(),
+ tableConfig,
+ caseSensitive);
try {
table = (FileStoreTable) catalog.getTable(identifier);
- if (shouldMonitorTable(table.schema(), mySqlSchema, identifier)) {
+ Supplier<String> errMsg =
+ incompatibleMessage(table.schema(), mySqlSchema, identifier);
+ if (shouldMonitorTable(table.schema(), fromMySql, errMsg)) {
monitoredTables.add(mySqlSchema.tableName());
}
} catch (Catalog.TableNotExistException e) {
- Schema schema =
- MySqlActionUtils.buildPaimonSchema(
- mySqlSchema,
- Collections.emptyList(),
- Collections.emptyList(),
- tableConfig);
- catalog.createTable(identifier, schema, false);
+ catalog.createTable(identifier, fromMySql, false);
table = (FileStoreTable) catalog.getTable(identifier);
monitoredTables.add(mySqlSchema.tableName());
}
@@ -251,7 +255,7 @@ public class MySqlSyncDatabaseAction implements Action {
tablePrefix));
}
- private List<MySqlSchema> getMySqlSchemaList(boolean caseSensitive) throws Exception {
+ private List<MySqlSchema> getMySqlSchemaList() throws Exception {
String databaseName = mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME);
List<MySqlSchema> mySqlSchemaList = new ArrayList<>();
try (Connection conn = MySqlActionUtils.getConnection(mySqlConfig)) {
@@ -263,8 +267,7 @@ public class MySqlSyncDatabaseAction implements Action {
if (!shouldMonitorTable(tableName)) {
continue;
}
- MySqlSchema mySqlSchema =
- new MySqlSchema(metaData, databaseName, tableName, caseSensitive);
+ MySqlSchema mySqlSchema = new MySqlSchema(metaData, databaseName, tableName);
if (mySqlSchema.primaryKeys().size() > 0) {
// only tables with primary keys will be considered
mySqlSchemaList.add(mySqlSchema);
@@ -288,36 +291,33 @@ public class MySqlSyncDatabaseAction implements Action {
}
private boolean shouldMonitorTable(
- TableSchema tableSchema, MySqlSchema mySqlSchema, Identifier identifier) {
+ TableSchema tableSchema, Schema mySqlSchema, Supplier<String> errMsg) {
if (MySqlActionUtils.schemaCompatible(tableSchema, mySqlSchema)) {
return true;
} else if (ignoreIncompatible) {
- LOG.warn(
- "Incompatible schema found. This table will be ignored.\n"
- + "Paimon table is: {}, fields are: {}.\n"
- + "MySQL table is: {}.{}, fields are: {}.",
- identifier.getFullName(),
- tableSchema.fields(),
- mySqlSchema.databaseName(),
- mySqlSchema.tableName(),
- mySqlSchema.fields());
+ LOG.warn(errMsg.get() + "This table will be ignored.");
return false;
} else {
throw new IllegalArgumentException(
- String.format(
- "Incompatible schema found.\n"
- + "Paimon table is: %s, fields are: %s.\n"
- + "MySQL table is: %s.%s, fields are: %s.\n"
- + "If you want to ignore the incompatible tables, "
- + "please specify --ignore-incompatible to true.",
- identifier.getFullName(),
- tableSchema.fields(),
- mySqlSchema.databaseName(),
- mySqlSchema.tableName(),
- mySqlSchema.fields()));
+ errMsg.get()
+ + "If you want to ignore the incompatible tables, please specify --ignore-incompatible to true.");
}
}
+ private Supplier<String> incompatibleMessage(
+ TableSchema paimonSchema, MySqlSchema mySqlSchema, Identifier identifier) {
+ return () ->
+ String.format(
+ "Incompatible schema found.\n"
+ + "Paimon table is: %s, fields are: %s.\n"
+ + "MySQL table is: %s.%s, fields are: %s.\n",
+ identifier.getFullName(),
+ paimonSchema.fields(),
+ mySqlSchema.databaseName(),
+ mySqlSchema.tableName(),
+ mySqlSchema.fields());
+ }
+
// ------------------------------------------------------------------------
// Flink run methods
// ------------------------------------------------------------------------
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
index 13f62d480..153af35f0 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
@@ -131,7 +131,7 @@ public class MySqlSyncTableAction implements Action {
}
MySqlSchema mySqlSchema =
- getMySqlSchemaList(caseSensitive).stream()
+ getMySqlSchemaList().stream()
.reduce(MySqlSchema::merge)
.orElseThrow(
() ->
@@ -142,14 +142,14 @@ public class MySqlSyncTableAction implements Action {
Identifier identifier = new Identifier(database, table);
FileStoreTable table;
+ Schema fromMySql =
+ MySqlActionUtils.buildPaimonSchema(
+ mySqlSchema, partitionKeys, primaryKeys, tableConfig, caseSensitive);
try {
table = (FileStoreTable) catalog.getTable(identifier);
- MySqlActionUtils.assertSchemaCompatible(table.schema(), mySqlSchema);
+ MySqlActionUtils.assertSchemaCompatible(table.schema(), fromMySql);
} catch (Catalog.TableNotExistException e) {
- Schema schema =
- MySqlActionUtils.buildPaimonSchema(
- mySqlSchema, partitionKeys, primaryKeys, tableConfig);
- catalog.createTable(identifier, schema, false);
+ catalog.createTable(identifier, fromMySql, false);
table = (FileStoreTable) catalog.getTable(identifier);
}
@@ -199,7 +199,7 @@ public class MySqlSyncTableAction implements Action {
}
}
- private List<MySqlSchema> getMySqlSchemaList(boolean caseSensitive) throws Exception {
+ private List<MySqlSchema> getMySqlSchemaList() throws Exception {
Pattern databasePattern =
Pattern.compile(mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME));
Pattern tablePattern = Pattern.compile(mySqlConfig.get(MySqlSourceOptions.TABLE_NAME));
@@ -217,11 +217,7 @@ public class MySqlSyncTableAction implements Action {
Matcher tableMatcher = tablePattern.matcher(tableName);
if (tableMatcher.matches()) {
mySqlSchemaList.add(
- new MySqlSchema(
- metaData,
- databaseName,
- tableName,
- caseSensitive));
+ new MySqlSchema(metaData, databaseName, tableName));
}
}
}