You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/05/12 09:14:34 UTC

[incubator-paimon] 02/09: [flink][refactor] Extract caseSensitive related codes from MySqlSchema to util (#1123)

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit 18b43d5a7afe6021e24d20b42a89c8346ad65a4b
Author: yuzelin <33...@users.noreply.github.com>
AuthorDate: Thu May 11 11:17:07 2023 +0800

    [flink][refactor] Extract caseSensitive related codes from MySqlSchema to util (#1123)
---
 .../flink/action/cdc/mysql/MySqlActionUtils.java   | 54 +++++++++++++-----
 .../paimon/flink/action/cdc/mysql/MySqlSchema.java | 18 +-----
 .../action/cdc/mysql/MySqlSyncDatabaseAction.java  | 66 +++++++++++-----------
 .../action/cdc/mysql/MySqlSyncTableAction.java     | 20 +++----
 4 files changed, 84 insertions(+), 74 deletions(-)

diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
index 028b742c2..e63899e23 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.action.cdc.mysql;
 import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 
 import com.ververica.cdc.connectors.mysql.source.MySqlSource;
@@ -39,9 +40,11 @@ import org.apache.kafka.connect.json.JsonConverterConfig;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.stream.Collectors;
 
 class MySqlActionUtils {
 
@@ -55,25 +58,25 @@ class MySqlActionUtils {
                 mySqlConfig.get(MySqlSourceOptions.PASSWORD));
     }
 
-    static void assertSchemaCompatible(TableSchema tableSchema, MySqlSchema mySqlSchema) {
-        if (!schemaCompatible(tableSchema, mySqlSchema)) {
+    static void assertSchemaCompatible(TableSchema paimonSchema, Schema mySqlSchema) {
+        if (!schemaCompatible(paimonSchema, mySqlSchema)) {
             throw new IllegalArgumentException(
                     "Paimon schema and MySQL schema are not compatible.\n"
                             + "Paimon fields are: "
-                            + tableSchema.fields()
+                            + paimonSchema.fields()
                             + ".\nMySQL fields are: "
                             + mySqlSchema.fields());
         }
     }
 
-    static boolean schemaCompatible(TableSchema tableSchema, MySqlSchema mySqlSchema) {
-        for (Map.Entry<String, Tuple2<DataType, String>> entry : mySqlSchema.fields().entrySet()) {
-            int idx = tableSchema.fieldNames().indexOf(entry.getKey());
+    static boolean schemaCompatible(TableSchema paimonSchema, Schema mySqlSchema) {
+        for (DataField field : mySqlSchema.fields()) {
+            int idx = paimonSchema.fieldNames().indexOf(field.name());
             if (idx < 0) {
                 return false;
             }
-            DataType type = tableSchema.fields().get(idx).type();
-            if (UpdatedDataFieldsProcessFunction.canConvert(entry.getValue().f0, type)
+            DataType type = paimonSchema.fields().get(idx).type();
+            if (UpdatedDataFieldsProcessFunction.canConvert(field.type(), type)
                     != UpdatedDataFieldsProcessFunction.ConvertAction.CONVERT) {
                 return false;
             }
@@ -85,24 +88,49 @@ class MySqlActionUtils {
             MySqlSchema mySqlSchema,
             List<String> specifiedPartitionKeys,
             List<String> specifiedPrimaryKeys,
-            Map<String, String> paimonConfig) {
+            Map<String, String> paimonConfig,
+            boolean caseSensitive) {
         Schema.Builder builder = Schema.newBuilder();
         builder.options(paimonConfig);
 
-        for (Map.Entry<String, Tuple2<DataType, String>> entry : mySqlSchema.fields().entrySet()) {
+        // build columns and primary keys from mySqlSchema
+        LinkedHashMap<String, Tuple2<DataType, String>> mySqlFields;
+        List<String> mySqlPrimaryKeys;
+        if (caseSensitive) {
+            mySqlFields = mySqlSchema.fields();
+            mySqlPrimaryKeys = mySqlSchema.primaryKeys();
+        } else {
+            mySqlFields = new LinkedHashMap<>();
+            for (Map.Entry<String, Tuple2<DataType, String>> entry :
+                    mySqlSchema.fields().entrySet()) {
+                String fieldName = entry.getKey();
+                checkArgument(
+                        !mySqlFields.containsKey(fieldName.toLowerCase()),
+                        String.format(
+                                "Duplicate key '%s' in table '%s.%s' appears when converting fields map keys to case-insensitive form.",
+                                fieldName, mySqlSchema.databaseName(), mySqlSchema.tableName()));
+                mySqlFields.put(fieldName.toLowerCase(), entry.getValue());
+            }
+            mySqlPrimaryKeys =
+                    mySqlSchema.primaryKeys().stream()
+                            .map(String::toLowerCase)
+                            .collect(Collectors.toList());
+        }
+
+        for (Map.Entry<String, Tuple2<DataType, String>> entry : mySqlFields.entrySet()) {
             builder.column(entry.getKey(), entry.getValue().f0, entry.getValue().f1);
         }
 
         if (specifiedPrimaryKeys.size() > 0) {
             for (String key : specifiedPrimaryKeys) {
-                if (!mySqlSchema.fields().containsKey(key)) {
+                if (!mySqlFields.containsKey(key)) {
                     throw new IllegalArgumentException(
                             "Specified primary key " + key + " does not exist in MySQL tables");
                 }
             }
             builder.primaryKey(specifiedPrimaryKeys);
-        } else if (mySqlSchema.primaryKeys().size() > 0) {
-            builder.primaryKey(mySqlSchema.primaryKeys());
+        } else if (mySqlPrimaryKeys.size() > 0) {
+            builder.primaryKey(mySqlPrimaryKeys);
         } else {
             throw new IllegalArgumentException(
                     "Primary keys are not specified. "
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
index 2b390b81e..15a731487 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
@@ -30,8 +30,6 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.paimon.utils.Preconditions.checkArgument;
-
 /** Utility class to load MySQL table schema with JDBC. */
 public class MySqlSchema {
 
@@ -41,8 +39,7 @@ public class MySqlSchema {
     private final LinkedHashMap<String, Tuple2<DataType, String>> fields;
     private final List<String> primaryKeys;
 
-    public MySqlSchema(
-            DatabaseMetaData metaData, String databaseName, String tableName, boolean caseSensitive)
+    public MySqlSchema(DatabaseMetaData metaData, String databaseName, String tableName)
             throws Exception {
         this.databaseName = databaseName;
         this.tableName = tableName;
@@ -62,14 +59,6 @@ public class MySqlSchema {
                 if (rs.wasNull()) {
                     scale = null;
                 }
-                if (!caseSensitive) {
-                    checkArgument(
-                            !fields.containsKey(fieldName.toLowerCase()),
-                            String.format(
-                                    "Duplicate key '%s' in table '%s.%s' appears when converting fields map keys to case-insensitive form.",
-                                    fieldName, databaseName, tableName));
-                    fieldName = fieldName.toLowerCase();
-                }
                 fields.put(
                         fieldName,
                         Tuple2.of(
@@ -82,9 +71,6 @@ public class MySqlSchema {
         try (ResultSet rs = metaData.getPrimaryKeys(databaseName, null, tableName)) {
             while (rs.next()) {
                 String fieldName = rs.getString("COLUMN_NAME");
-                if (!caseSensitive) {
-                    fieldName = fieldName.toLowerCase();
-                }
                 primaryKeys.add(fieldName);
             }
         }
@@ -98,7 +84,7 @@ public class MySqlSchema {
         return tableName;
     }
 
-    public Map<String, Tuple2<DataType, String>> fields() {
+    public LinkedHashMap<String, Tuple2<DataType, String>> fields() {
         return fields;
     }
 
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index d39702383..de8785b3d 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -53,6 +53,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.function.Supplier;
 import java.util.regex.Pattern;
 
 import static org.apache.paimon.flink.action.Action.getConfigMap;
@@ -169,7 +170,7 @@ public class MySqlSyncDatabaseAction implements Action {
             validateCaseInsensitive();
         }
 
-        List<MySqlSchema> mySqlSchemas = getMySqlSchemaList(caseSensitive);
+        List<MySqlSchema> mySqlSchemas = getMySqlSchemaList();
         checkArgument(
                 mySqlSchemas.size() > 0,
                 "No tables found in MySQL database "
@@ -186,19 +187,22 @@ public class MySqlSyncDatabaseAction implements Action {
             String paimonTableName = tableNameConverter.convert(mySqlSchema.tableName());
             Identifier identifier = new Identifier(database, paimonTableName);
             FileStoreTable table;
+            Schema fromMySql =
+                    MySqlActionUtils.buildPaimonSchema(
+                            mySqlSchema,
+                            Collections.emptyList(),
+                            Collections.emptyList(),
+                            tableConfig,
+                            caseSensitive);
             try {
                 table = (FileStoreTable) catalog.getTable(identifier);
-                if (shouldMonitorTable(table.schema(), mySqlSchema, identifier)) {
+                Supplier<String> errMsg =
+                        incompatibleMessage(table.schema(), mySqlSchema, identifier);
+                if (shouldMonitorTable(table.schema(), fromMySql, errMsg)) {
                     monitoredTables.add(mySqlSchema.tableName());
                 }
             } catch (Catalog.TableNotExistException e) {
-                Schema schema =
-                        MySqlActionUtils.buildPaimonSchema(
-                                mySqlSchema,
-                                Collections.emptyList(),
-                                Collections.emptyList(),
-                                tableConfig);
-                catalog.createTable(identifier, schema, false);
+                catalog.createTable(identifier, fromMySql, false);
                 table = (FileStoreTable) catalog.getTable(identifier);
                 monitoredTables.add(mySqlSchema.tableName());
             }
@@ -251,7 +255,7 @@ public class MySqlSyncDatabaseAction implements Action {
                         tablePrefix));
     }
 
-    private List<MySqlSchema> getMySqlSchemaList(boolean caseSensitive) throws Exception {
+    private List<MySqlSchema> getMySqlSchemaList() throws Exception {
         String databaseName = mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME);
         List<MySqlSchema> mySqlSchemaList = new ArrayList<>();
         try (Connection conn = MySqlActionUtils.getConnection(mySqlConfig)) {
@@ -263,8 +267,7 @@ public class MySqlSyncDatabaseAction implements Action {
                     if (!shouldMonitorTable(tableName)) {
                         continue;
                     }
-                    MySqlSchema mySqlSchema =
-                            new MySqlSchema(metaData, databaseName, tableName, caseSensitive);
+                    MySqlSchema mySqlSchema = new MySqlSchema(metaData, databaseName, tableName);
                     if (mySqlSchema.primaryKeys().size() > 0) {
                         // only tables with primary keys will be considered
                         mySqlSchemaList.add(mySqlSchema);
@@ -288,36 +291,33 @@ public class MySqlSyncDatabaseAction implements Action {
     }
 
     private boolean shouldMonitorTable(
-            TableSchema tableSchema, MySqlSchema mySqlSchema, Identifier identifier) {
+            TableSchema tableSchema, Schema mySqlSchema, Supplier<String> errMsg) {
         if (MySqlActionUtils.schemaCompatible(tableSchema, mySqlSchema)) {
             return true;
         } else if (ignoreIncompatible) {
-            LOG.warn(
-                    "Incompatible schema found. This table will be ignored.\n"
-                            + "Paimon table is: {}, fields are: {}.\n"
-                            + "MySQL table is: {}.{}, fields are: {}.",
-                    identifier.getFullName(),
-                    tableSchema.fields(),
-                    mySqlSchema.databaseName(),
-                    mySqlSchema.tableName(),
-                    mySqlSchema.fields());
+            LOG.warn(errMsg.get() + "This table will be ignored.");
             return false;
         } else {
             throw new IllegalArgumentException(
-                    String.format(
-                            "Incompatible schema found.\n"
-                                    + "Paimon table is: %s, fields are: %s.\n"
-                                    + "MySQL table is: %s.%s, fields are: %s.\n"
-                                    + "If you want to ignore the incompatible tables, "
-                                    + "please specify --ignore-incompatible to true.",
-                            identifier.getFullName(),
-                            tableSchema.fields(),
-                            mySqlSchema.databaseName(),
-                            mySqlSchema.tableName(),
-                            mySqlSchema.fields()));
+                    errMsg.get()
+                            + "If you want to ignore the incompatible tables, please specify --ignore-incompatible to true.");
         }
     }
 
+    private Supplier<String> incompatibleMessage(
+            TableSchema paimonSchema, MySqlSchema mySqlSchema, Identifier identifier) {
+        return () ->
+                String.format(
+                        "Incompatible schema found.\n"
+                                + "Paimon table is: %s, fields are: %s.\n"
+                                + "MySQL table is: %s.%s, fields are: %s.\n",
+                        identifier.getFullName(),
+                        paimonSchema.fields(),
+                        mySqlSchema.databaseName(),
+                        mySqlSchema.tableName(),
+                        mySqlSchema.fields());
+    }
+
     // ------------------------------------------------------------------------
     //  Flink run methods
     // ------------------------------------------------------------------------
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
index 13f62d480..153af35f0 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
@@ -131,7 +131,7 @@ public class MySqlSyncTableAction implements Action {
         }
 
         MySqlSchema mySqlSchema =
-                getMySqlSchemaList(caseSensitive).stream()
+                getMySqlSchemaList().stream()
                         .reduce(MySqlSchema::merge)
                         .orElseThrow(
                                 () ->
@@ -142,14 +142,14 @@ public class MySqlSyncTableAction implements Action {
 
         Identifier identifier = new Identifier(database, table);
         FileStoreTable table;
+        Schema fromMySql =
+                MySqlActionUtils.buildPaimonSchema(
+                        mySqlSchema, partitionKeys, primaryKeys, tableConfig, caseSensitive);
         try {
             table = (FileStoreTable) catalog.getTable(identifier);
-            MySqlActionUtils.assertSchemaCompatible(table.schema(), mySqlSchema);
+            MySqlActionUtils.assertSchemaCompatible(table.schema(), fromMySql);
         } catch (Catalog.TableNotExistException e) {
-            Schema schema =
-                    MySqlActionUtils.buildPaimonSchema(
-                            mySqlSchema, partitionKeys, primaryKeys, tableConfig);
-            catalog.createTable(identifier, schema, false);
+            catalog.createTable(identifier, fromMySql, false);
             table = (FileStoreTable) catalog.getTable(identifier);
         }
 
@@ -199,7 +199,7 @@ public class MySqlSyncTableAction implements Action {
         }
     }
 
-    private List<MySqlSchema> getMySqlSchemaList(boolean caseSensitive) throws Exception {
+    private List<MySqlSchema> getMySqlSchemaList() throws Exception {
         Pattern databasePattern =
                 Pattern.compile(mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME));
         Pattern tablePattern = Pattern.compile(mySqlConfig.get(MySqlSourceOptions.TABLE_NAME));
@@ -217,11 +217,7 @@ public class MySqlSyncTableAction implements Action {
                                 Matcher tableMatcher = tablePattern.matcher(tableName);
                                 if (tableMatcher.matches()) {
                                     mySqlSchemaList.add(
-                                            new MySqlSchema(
-                                                    metaData,
-                                                    databaseName,
-                                                    tableName,
-                                                    caseSensitive));
+                                            new MySqlSchema(metaData, databaseName, tableName));
                                 }
                             }
                         }