You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2023/04/10 03:04:09 UTC
[inlong] branch master updated: [INLONG-7693][Sort] MySQL CDC Connector supports specifying field synchronization (#7694)
This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f87204741 [INLONG-7693][Sort] MySQL CDC Connector supports specifying field synchronization (#7694)
f87204741 is described below
commit f872047417fefd3af0994872a4fae755c58c32f0
Author: chen.zs <34...@qq.com>
AuthorDate: Mon Apr 10 11:04:03 2023 +0800
[INLONG-7693][Sort] MySQL CDC Connector supports specifying field synchronization (#7694)
---
.../sort/cdc/base/util/ColumnFilterUtil.java | 69 +++++++++++
.../inlong/sort/cdc/mysql/source/MySqlSource.java | 3 +-
.../mysql/source/reader/MySqlRecordEmitter.java | 19 +++-
.../parser/AllMigrateWithSpecifyingFieldTest.java | 126 +++++++++++++++++++++
4 files changed, 210 insertions(+), 7 deletions(-)
diff --git a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/ColumnFilterUtil.java b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/ColumnFilterUtil.java
new file mode 100644
index 000000000..07aaa3a35
--- /dev/null
+++ b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/ColumnFilterUtil.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.base.util;
+
+import io.debezium.config.Configuration;
+import io.debezium.relational.ColumnFilterMode;
+import io.debezium.relational.Table;
+import io.debezium.relational.TableEditor;
+import io.debezium.relational.Tables;
+import io.debezium.relational.history.TableChanges;
+
+import static io.debezium.relational.RelationalDatabaseConnectorConfig.COLUMN_BLACKLIST;
+import static io.debezium.relational.RelationalDatabaseConnectorConfig.COLUMN_WHITELIST;
+import static io.debezium.relational.RelationalDatabaseConnectorConfig.COLUMN_EXCLUDE_LIST;
+import static io.debezium.relational.RelationalDatabaseConnectorConfig.COLUMN_INCLUDE_LIST;
+
+/**
+ * Utility class to Combining column.exclude.list/column.blacklist, column.include.list/column.whitelist parameters
+ * to generate new TableChange object.
+ */
+public class ColumnFilterUtil {
+
+ public static Tables.ColumnNameFilter createColumnFilter(
+ Configuration configuration, ColumnFilterMode columnFilterMode) {
+
+ String columnExcludeList = configuration.getFallbackStringProperty(COLUMN_EXCLUDE_LIST, COLUMN_BLACKLIST);
+ String columnIncludeList = configuration.getFallbackStringProperty(COLUMN_INCLUDE_LIST, COLUMN_WHITELIST);
+
+ Tables.ColumnNameFilter columnFilter;
+ if (columnIncludeList != null) {
+ columnFilter = Tables.ColumnNameFilterFactory.createIncludeListFilter(columnIncludeList, columnFilterMode);
+ } else {
+ columnFilter = Tables.ColumnNameFilterFactory.createExcludeListFilter(columnExcludeList, columnFilterMode);
+ }
+
+ return columnFilter;
+ }
+
+ public static TableChanges.TableChange createTableChange(
+ TableChanges.TableChange oldTableChange, Tables.ColumnNameFilter columnNameFilter) {
+ Table table = oldTableChange.getTable();
+ TableEditor tableEditor = table.edit();
+ table.columns()
+ .stream()
+ .filter(column -> !columnNameFilter.matches(
+ table.id().catalog(),
+ table.id().schema(),
+ table.id().table(),
+ column.name()))
+ .forEach(column -> tableEditor.removeColumn(column.name()));
+
+ return new TableChanges.TableChange(oldTableChange.getType(), tableEditor.create());
+ }
+}
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
index 47fd75d4e..852f542c2 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
@@ -164,8 +164,7 @@ public class MySqlSource<T>
new MySqlRecordEmitter<>(
deserializationSchema,
sourceReaderMetrics,
- sourceConfig.isIncludeSchemaChanges(),
- sourceConfig.isIncludeIncremental()),
+ sourceConfig),
readerContext.getConfiguration(),
mySqlSourceReaderContext,
sourceConfig, sourceReaderMetrics);
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
index a8b6eb830..3ad89632f 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
@@ -21,7 +21,9 @@ import com.ververica.cdc.connectors.mysql.source.utils.RecordUtils;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.data.Envelope;
import io.debezium.document.Array;
+import io.debezium.relational.ColumnFilterMode;
import io.debezium.relational.TableId;
+import io.debezium.relational.Tables;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.HistoryRecord.Fields;
import io.debezium.relational.history.TableChanges;
@@ -32,6 +34,8 @@ import org.apache.flink.util.Collector;
import org.apache.inlong.sort.base.enums.ReadPhase;
import org.apache.inlong.sort.cdc.base.debezium.DebeziumDeserializationSchema;
import org.apache.inlong.sort.cdc.base.debezium.history.FlinkJsonTableChangeSerializer;
+import org.apache.inlong.sort.cdc.base.util.ColumnFilterUtil;
+import org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceConfig;
import org.apache.inlong.sort.cdc.mysql.source.metrics.MySqlSourceReaderMetrics;
import org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffset;
import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSplitState;
@@ -74,6 +78,7 @@ public final class MySqlRecordEmitter<T>
private final MySqlSourceReaderMetrics sourceReaderMetrics;
private final boolean includeSchemaChanges;
private final OutputCollector<T> outputCollector;
+ private final Tables.ColumnNameFilter columnNameFilter;
private volatile long binlogPos = 0L;
private volatile long binlogFileNum = 0L;
private volatile Boolean iSnapShot = false;
@@ -88,12 +93,14 @@ public final class MySqlRecordEmitter<T>
public MySqlRecordEmitter(
DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
MySqlSourceReaderMetrics sourceReaderMetrics,
- boolean includeSchemaChanges, boolean includeIncremental) {
+ MySqlSourceConfig sourceConfig) {
this.debeziumDeserializationSchema = debeziumDeserializationSchema;
this.sourceReaderMetrics = sourceReaderMetrics;
- this.includeSchemaChanges = includeSchemaChanges;
+ this.includeSchemaChanges = sourceConfig.isIncludeSchemaChanges();
this.outputCollector = new OutputCollector<>();
- this.includeIncremental = includeIncremental;
+ this.includeIncremental = sourceConfig.isIncludeIncremental();
+ this.columnNameFilter = ColumnFilterUtil.createColumnFilter(
+ sourceConfig.getDbzConfiguration(), ColumnFilterMode.CATALOG);
}
@Override
@@ -114,7 +121,8 @@ public final class MySqlRecordEmitter<T>
for (TableChange tableChange : changes) {
splitState.asBinlogSplitState().recordSchema(tableChange.getId(), tableChange);
if (includeSchemaChanges) {
- outputDdlElement(element, output, splitState, tableChange);
+ TableChange newTableChange = ColumnFilterUtil.createTableChange(tableChange, columnNameFilter);
+ outputDdlElement(element, output, splitState, newTableChange);
}
}
@@ -153,6 +161,7 @@ public final class MySqlRecordEmitter<T>
updateSnapshotRecord(element, splitState);
+ TableChange newTableChange = ColumnFilterUtil.createTableChange(tableSchema, columnNameFilter);
debeziumDeserializationSchema.deserialize(
element,
new Collector<T>() {
@@ -173,7 +182,7 @@ public final class MySqlRecordEmitter<T>
// do nothing
}
},
- tableSchema);
+ newTableChange);
} else if (isHeartbeatEvent(element)) {
updateStartingOffsetForSplit(splitState, element);
} else {
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateWithSpecifyingFieldTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateWithSpecifyingFieldTest.java
new file mode 100644
index 000000000..352a359c0
--- /dev/null
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateWithSpecifyingFieldTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.parser;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.inlong.common.enums.MetaField;
+import org.apache.inlong.sort.formats.common.VarBinaryFormatInfo;
+import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
+import org.apache.inlong.sort.parser.result.ParseResult;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.MetaFieldInfo;
+import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.enums.ExtractMode;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
+import org.apache.inlong.sort.protocol.node.load.MySqlLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class AllMigrateWithSpecifyingFieldTest {
+
+ private MySqlExtractNode buildAllMigrateMySQLExtractNode() {
+
+ Map<String, String> option = new HashMap<>();
+ option.put("migrate-all", "true");
+ // Specify the required fields
+ option.put("debezium.column.include.list", "test.table1.id,test.table1.name");
+
+ List<String> tables = new ArrayList<>(10);
+ tables.add("test.table1");
+
+ List<FieldInfo> fields = Collections.singletonList(
+ new MetaFieldInfo("data", MetaField.DATA_BYTES_CANAL));
+
+ return new MySqlExtractNode("1", "mysql_input", fields,
+ null, option, null,
+ tables, "localhost", "root", "inlong",
+ "test", 3306, null, true, null,
+ ExtractMode.CDC, null, null);
+ }
+
+ private MySqlLoadNode buildAllMigrateMySQLLoadNode() {
+ List<FieldInfo> fields = Collections.singletonList(
+ new FieldInfo("data", new VarBinaryFormatInfo()));
+ List<FieldRelation> relations = Collections.singletonList(
+ new FieldRelation(new FieldInfo("data", new VarBinaryFormatInfo()),
+ new FieldInfo("data", new VarBinaryFormatInfo())));
+
+ Map<String, String> properties = new LinkedHashMap<>();
+ properties.put("sink.multiple.enable", "true");
+ properties.put("sink.multiple.format", "canal-json");
+ properties.put("sink.multiple.schema-update.policy", "try_it_best");
+ properties.put("sink.multiple.database-pattern", "test2");
+ properties.put("sink.multiple.schema-pattern", "");
+ // The target table naming rule
+ properties.put("sink.multiple.table-pattern", "${database}_${table}");
+
+ // in all migrate mode, You can set the table-name parameter value at will
+ String table = "test2.xxx";
+ return new MySqlLoadNode("2", "mysql_output", fields, relations, null, null,
+ null, properties, "jdbc:mysql://localhost:3306", "root", "inlong",
+ table, null);
+ }
+
+ private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+ List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
+ List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
+ return new NodeRelation(inputIds, outputIds);
+ }
+
+ /**
+ * Test all migrate, Synchronize the specified fields from mysql to the target mysql database
+ *
+ * @throws Exception The exception may throws when execute the case
+ */
+ @Test
+ public void testAllMigrateMySQLToMySQL() throws Exception {
+ EnvironmentSettings settings = EnvironmentSettings
+ .newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build();
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.enableCheckpointing(10000);
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+ Node inputNode = buildAllMigrateMySQLExtractNode();
+ Node outputNode = buildAllMigrateMySQLLoadNode();
+ StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode, outputNode),
+ Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
+ Collections.singletonList(outputNode))));
+ GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo));
+ FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
+ ParseResult result = parser.parse();
+ Assert.assertTrue(result.tryExecute());
+ }
+}