You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2023/04/10 03:04:09 UTC

[inlong] branch master updated: [INLONG-7693][Sort] MySQL CDC Connector supports specifying field synchronization (#7694)

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f87204741 [INLONG-7693][Sort] MySQL CDC Connector supports specifying field synchronization (#7694)
f87204741 is described below

commit f872047417fefd3af0994872a4fae755c58c32f0
Author: chen.zs <34...@qq.com>
AuthorDate: Mon Apr 10 11:04:03 2023 +0800

    [INLONG-7693][Sort] MySQL CDC Connector supports specifying field synchronization (#7694)
---
 .../sort/cdc/base/util/ColumnFilterUtil.java       |  69 +++++++++++
 .../inlong/sort/cdc/mysql/source/MySqlSource.java  |   3 +-
 .../mysql/source/reader/MySqlRecordEmitter.java    |  19 +++-
 .../parser/AllMigrateWithSpecifyingFieldTest.java  | 126 +++++++++++++++++++++
 4 files changed, 210 insertions(+), 7 deletions(-)

diff --git a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/ColumnFilterUtil.java b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/ColumnFilterUtil.java
new file mode 100644
index 000000000..07aaa3a35
--- /dev/null
+++ b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/ColumnFilterUtil.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.base.util;
+
+import io.debezium.config.Configuration;
+import io.debezium.relational.ColumnFilterMode;
+import io.debezium.relational.Table;
+import io.debezium.relational.TableEditor;
+import io.debezium.relational.Tables;
+import io.debezium.relational.history.TableChanges;
+
+import static io.debezium.relational.RelationalDatabaseConnectorConfig.COLUMN_BLACKLIST;
+import static io.debezium.relational.RelationalDatabaseConnectorConfig.COLUMN_WHITELIST;
+import static io.debezium.relational.RelationalDatabaseConnectorConfig.COLUMN_EXCLUDE_LIST;
+import static io.debezium.relational.RelationalDatabaseConnectorConfig.COLUMN_INCLUDE_LIST;
+
+/**
+ * Utility class to Combining column.exclude.list/column.blacklist, column.include.list/column.whitelist parameters
+ * to generate new TableChange object.
+ */
+public class ColumnFilterUtil {
+
+    public static Tables.ColumnNameFilter createColumnFilter(
+            Configuration configuration, ColumnFilterMode columnFilterMode) {
+
+        String columnExcludeList = configuration.getFallbackStringProperty(COLUMN_EXCLUDE_LIST, COLUMN_BLACKLIST);
+        String columnIncludeList = configuration.getFallbackStringProperty(COLUMN_INCLUDE_LIST, COLUMN_WHITELIST);
+
+        Tables.ColumnNameFilter columnFilter;
+        if (columnIncludeList != null) {
+            columnFilter = Tables.ColumnNameFilterFactory.createIncludeListFilter(columnIncludeList, columnFilterMode);
+        } else {
+            columnFilter = Tables.ColumnNameFilterFactory.createExcludeListFilter(columnExcludeList, columnFilterMode);
+        }
+
+        return columnFilter;
+    }
+
+    public static TableChanges.TableChange createTableChange(
+            TableChanges.TableChange oldTableChange, Tables.ColumnNameFilter columnNameFilter) {
+        Table table = oldTableChange.getTable();
+        TableEditor tableEditor = table.edit();
+        table.columns()
+                .stream()
+                .filter(column -> !columnNameFilter.matches(
+                        table.id().catalog(),
+                        table.id().schema(),
+                        table.id().table(),
+                        column.name()))
+                .forEach(column -> tableEditor.removeColumn(column.name()));
+
+        return new TableChanges.TableChange(oldTableChange.getType(), tableEditor.create());
+    }
+}
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
index 47fd75d4e..852f542c2 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
@@ -164,8 +164,7 @@ public class MySqlSource<T>
                 new MySqlRecordEmitter<>(
                         deserializationSchema,
                         sourceReaderMetrics,
-                        sourceConfig.isIncludeSchemaChanges(),
-                        sourceConfig.isIncludeIncremental()),
+                        sourceConfig),
                 readerContext.getConfiguration(),
                 mySqlSourceReaderContext,
                 sourceConfig, sourceReaderMetrics);
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
index a8b6eb830..3ad89632f 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
@@ -21,7 +21,9 @@ import com.ververica.cdc.connectors.mysql.source.utils.RecordUtils;
 import io.debezium.connector.AbstractSourceInfo;
 import io.debezium.data.Envelope;
 import io.debezium.document.Array;
+import io.debezium.relational.ColumnFilterMode;
 import io.debezium.relational.TableId;
+import io.debezium.relational.Tables;
 import io.debezium.relational.history.HistoryRecord;
 import io.debezium.relational.history.HistoryRecord.Fields;
 import io.debezium.relational.history.TableChanges;
@@ -32,6 +34,8 @@ import org.apache.flink.util.Collector;
 import org.apache.inlong.sort.base.enums.ReadPhase;
 import org.apache.inlong.sort.cdc.base.debezium.DebeziumDeserializationSchema;
 import org.apache.inlong.sort.cdc.base.debezium.history.FlinkJsonTableChangeSerializer;
+import org.apache.inlong.sort.cdc.base.util.ColumnFilterUtil;
+import org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceConfig;
 import org.apache.inlong.sort.cdc.mysql.source.metrics.MySqlSourceReaderMetrics;
 import org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffset;
 import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSplitState;
@@ -74,6 +78,7 @@ public final class MySqlRecordEmitter<T>
     private final MySqlSourceReaderMetrics sourceReaderMetrics;
     private final boolean includeSchemaChanges;
     private final OutputCollector<T> outputCollector;
+    private final Tables.ColumnNameFilter columnNameFilter;
     private volatile long binlogPos = 0L;
     private volatile long binlogFileNum = 0L;
     private volatile Boolean iSnapShot = false;
@@ -88,12 +93,14 @@ public final class MySqlRecordEmitter<T>
     public MySqlRecordEmitter(
             DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
             MySqlSourceReaderMetrics sourceReaderMetrics,
-            boolean includeSchemaChanges, boolean includeIncremental) {
+            MySqlSourceConfig sourceConfig) {
         this.debeziumDeserializationSchema = debeziumDeserializationSchema;
         this.sourceReaderMetrics = sourceReaderMetrics;
-        this.includeSchemaChanges = includeSchemaChanges;
+        this.includeSchemaChanges = sourceConfig.isIncludeSchemaChanges();
         this.outputCollector = new OutputCollector<>();
-        this.includeIncremental = includeIncremental;
+        this.includeIncremental = sourceConfig.isIncludeIncremental();
+        this.columnNameFilter = ColumnFilterUtil.createColumnFilter(
+                sourceConfig.getDbzConfiguration(), ColumnFilterMode.CATALOG);
     }
 
     @Override
@@ -114,7 +121,8 @@ public final class MySqlRecordEmitter<T>
             for (TableChange tableChange : changes) {
                 splitState.asBinlogSplitState().recordSchema(tableChange.getId(), tableChange);
                 if (includeSchemaChanges) {
-                    outputDdlElement(element, output, splitState, tableChange);
+                    TableChange newTableChange = ColumnFilterUtil.createTableChange(tableChange, columnNameFilter);
+                    outputDdlElement(element, output, splitState, newTableChange);
                 }
             }
 
@@ -153,6 +161,7 @@ public final class MySqlRecordEmitter<T>
 
             updateSnapshotRecord(element, splitState);
 
+            TableChange newTableChange = ColumnFilterUtil.createTableChange(tableSchema, columnNameFilter);
             debeziumDeserializationSchema.deserialize(
                     element,
                     new Collector<T>() {
@@ -173,7 +182,7 @@ public final class MySqlRecordEmitter<T>
                             // do nothing
                         }
                     },
-                    tableSchema);
+                    newTableChange);
         } else if (isHeartbeatEvent(element)) {
             updateStartingOffsetForSplit(splitState, element);
         } else {
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateWithSpecifyingFieldTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateWithSpecifyingFieldTest.java
new file mode 100644
index 000000000..352a359c0
--- /dev/null
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateWithSpecifyingFieldTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.parser;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.inlong.common.enums.MetaField;
+import org.apache.inlong.sort.formats.common.VarBinaryFormatInfo;
+import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
+import org.apache.inlong.sort.parser.result.ParseResult;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.MetaFieldInfo;
+import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.enums.ExtractMode;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
+import org.apache.inlong.sort.protocol.node.load.MySqlLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class AllMigrateWithSpecifyingFieldTest {
+
+    private MySqlExtractNode buildAllMigrateMySQLExtractNode() {
+
+        Map<String, String> option = new HashMap<>();
+        option.put("migrate-all", "true");
+        // Specify the required fields
+        option.put("debezium.column.include.list", "test.table1.id,test.table1.name");
+
+        List<String> tables = new ArrayList<>(10);
+        tables.add("test.table1");
+
+        List<FieldInfo> fields = Collections.singletonList(
+                new MetaFieldInfo("data", MetaField.DATA_BYTES_CANAL));
+
+        return new MySqlExtractNode("1", "mysql_input", fields,
+                null, option, null,
+                tables, "localhost", "root", "inlong",
+                "test", 3306, null, true, null,
+                ExtractMode.CDC, null, null);
+    }
+
+    private MySqlLoadNode buildAllMigrateMySQLLoadNode() {
+        List<FieldInfo> fields = Collections.singletonList(
+                new FieldInfo("data", new VarBinaryFormatInfo()));
+        List<FieldRelation> relations = Collections.singletonList(
+                new FieldRelation(new FieldInfo("data", new VarBinaryFormatInfo()),
+                        new FieldInfo("data", new VarBinaryFormatInfo())));
+
+        Map<String, String> properties = new LinkedHashMap<>();
+        properties.put("sink.multiple.enable", "true");
+        properties.put("sink.multiple.format", "canal-json");
+        properties.put("sink.multiple.schema-update.policy", "try_it_best");
+        properties.put("sink.multiple.database-pattern", "test2");
+        properties.put("sink.multiple.schema-pattern", "");
+        // The target table naming rule
+        properties.put("sink.multiple.table-pattern", "${database}_${table}");
+
+        // in all migrate mode, You can set the table-name parameter value at will
+        String table = "test2.xxx";
+        return new MySqlLoadNode("2", "mysql_output", fields, relations, null, null,
+                null, properties, "jdbc:mysql://localhost:3306", "root", "inlong",
+                table, null);
+    }
+
+    private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+        List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
+        List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
+        return new NodeRelation(inputIds, outputIds);
+    }
+
+    /**
+     * Test all migrate, Synchronize the specified fields from mysql to the target mysql database
+     *
+     * @throws Exception The exception may throws when execute the case
+     */
+    @Test
+    public void testAllMigrateMySQLToMySQL() throws Exception {
+        EnvironmentSettings settings = EnvironmentSettings
+                .newInstance()
+                .useBlinkPlanner()
+                .inStreamingMode()
+                .build();
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(10000);
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+        Node inputNode = buildAllMigrateMySQLExtractNode();
+        Node outputNode = buildAllMigrateMySQLLoadNode();
+        StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode, outputNode),
+                Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
+                        Collections.singletonList(outputNode))));
+        GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo));
+        FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
+        ParseResult result = parser.parse();
+        Assert.assertTrue(result.tryExecute());
+    }
+}