You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/10/19 12:34:58 UTC
[inlong] branch master updated: [INLONG-6152][Sort] MySQL connector support filtering kinds of row data (#6173)
This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new bedbb5106 [INLONG-6152][Sort] MySQL connector support filtering kinds of row data (#6173)
bedbb5106 is described below
commit bedbb510641dd0e383954c5282ec656e79f81c47
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Wed Oct 19 20:34:52 2022 +0800
[INLONG-6152][Sort] MySQL connector support filtering kinds of row data (#6173)
---
.../apache/inlong/common/enums/RowKindEnum.java | 108 +++++++++++++++++++++
.../protocol/node/extract/MySqlExtractNode.java | 17 +++-
.../inlong/sort/base/filter/RowKindValidator.java | 70 +++++++++++++
.../inlong/sort/base/filter/RowValidator.java | 31 ++++++
.../sort/base/validator/TestRowValidator.java | 48 +++++++++
.../table/RowDataDebeziumDeserializeSchema.java | 35 +++----
.../mysql/source/config/MySqlSourceOptions.java | 11 +++
.../cdc/mysql/table/MySqlReadableMetadata.java | 5 +-
.../mysql/table/MySqlTableInlongSourceFactory.java | 7 +-
.../sort/cdc/mysql/table/MySqlTableSource.java | 22 +++--
.../apache/inlong/sort/parser/AllMigrateTest.java | 24 +++--
11 files changed, 335 insertions(+), 43 deletions(-)
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/RowKindEnum.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/RowKindEnum.java
new file mode 100644
index 000000000..296c86807
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/RowKindEnum.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.common.enums;
+
+/**
+ * Row kind provided by flink
+ */
+public enum RowKindEnum {
+
+ /** Insertion operation. */
+ INSERT("+I", (byte) 0),
+
+ /**
+ * Update operation with the previous content of the updated row.
+ */
+ UPDATE_BEFORE("-U", (byte) 1),
+
+ /**
+ * Update operation with new content of the updated row.
+ *
+ */
+ UPDATE_AFTER("+U", (byte) 2),
+
+ /** Deletion operation. */
+ DELETE("-D", (byte) 3);
+
+ private final String shortString;
+
+ private final byte value;
+
+ /**
+ * Creates a {@link RowKindEnum} enum with the given short string and byte value representation of
+ * the {@link RowKindEnum}.
+ */
+ RowKindEnum(String shortString, byte value) {
+ this.shortString = shortString;
+ this.value = value;
+ }
+
+ /**
+ * Returns a short string representation of this {@link RowKindEnum}.
+ *
+ * <ul>
+ * <li>"+I" represents {@link #INSERT}.
+ * <li>"-U" represents {@link #UPDATE_BEFORE}.
+ * <li>"+U" represents {@link #UPDATE_AFTER}.
+ * <li>"-D" represents {@link #DELETE}.
+ * </ul>
+ */
+ public String shortString() {
+ return shortString;
+ }
+
+ /**
+ * Returns the byte value representation of this {@link RowKindEnum}. The byte value is used for
+ * serialization and deserialization.
+ *
+ * <p>
+ *
+ * <ul>
+ * <li>"0" represents {@link #INSERT}.
+ * <li>"1" represents {@link #UPDATE_BEFORE}.
+ * <li>"2" represents {@link #UPDATE_AFTER}.
+ * <li>"3" represents {@link #DELETE}.
+ * </ul>
+ */
+ public byte toByteValue() {
+ return value;
+ }
+
+ /**
+ * Creates a {@link RowKindEnum} from the given byte value. Each {@link RowKindEnum} has a byte value
+ * representation.
+ *
+ * @see #toByteValue() for mapping of byte value and {@link RowKindEnum}.
+ */
+ public static RowKindEnum fromByteValue(byte value) {
+ switch (value) {
+ case 0:
+ return INSERT;
+ case 1:
+ return UPDATE_BEFORE;
+ case 2:
+ return UPDATE_AFTER;
+ case 3:
+ return DELETE;
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported byte value '" + value + "' for row kind.");
+ }
+ }
+
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MySqlExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MySqlExtractNode.java
index 1f590a3ab..e561610b8 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MySqlExtractNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MySqlExtractNode.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sort.protocol.node.extract;
import com.google.common.base.Preconditions;
+import java.util.stream.Collectors;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.apache.commons.lang3.StringUtils;
@@ -27,6 +28,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInc
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.inlong.common.enums.MetaField;
+import org.apache.inlong.common.enums.RowKindEnum;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.InlongMetric;
import org.apache.inlong.sort.protocol.Metadata;
@@ -79,6 +81,8 @@ public class MySqlExtractNode extends ExtractNode implements Metadata, InlongMet
private ExtractMode extractMode;
@JsonProperty("url")
private String url;
+ @JsonProperty("rowKindsFiltered")
+ private List<RowKindEnum> rowKindsFiltered;
/**
* Constructor only used for {@link ExtractMode#CDC}
@@ -116,7 +120,7 @@ public class MySqlExtractNode extends ExtractNode implements Metadata, InlongMet
@Nullable @JsonProperty("incrementalSnapshotEnabled") Boolean incrementalSnapshotEnabled,
@Nullable @JsonProperty("serverTimeZone") String serverTimeZone) {
this(id, name, fields, watermarkField, properties, primaryKey, tableNames, hostname, username, password,
- database, port, serverId, incrementalSnapshotEnabled, serverTimeZone, ExtractMode.CDC, null);
+ database, port, serverId, incrementalSnapshotEnabled, serverTimeZone, ExtractMode.CDC, null, null);
}
/**
@@ -143,7 +147,7 @@ public class MySqlExtractNode extends ExtractNode implements Metadata, InlongMet
@Nullable @JsonProperty("url") String url) {
this(id, name, fields, null, properties, primaryKey, tableNames, null, username,
password, null, null, null, null, null,
- ExtractMode.SCAN, url);
+ ExtractMode.SCAN, url, null);
}
/**
@@ -185,7 +189,8 @@ public class MySqlExtractNode extends ExtractNode implements Metadata, InlongMet
@Nullable @JsonProperty("incrementalSnapshotEnabled") Boolean incrementalSnapshotEnabled,
@Nullable @JsonProperty("serverTimeZone") String serverTimeZone,
@Nonnull @JsonProperty("extractMode") ExtractMode extractMode,
- @Nullable @JsonProperty("url") String url) {
+ @Nullable @JsonProperty("url") String url,
+ @Nullable @JsonProperty("rowKindsFiltered") List<RowKindEnum> rowKindsFiltered) {
super(id, name, fields, watermarkField, properties);
this.tableNames = Preconditions.checkNotNull(tableNames, "tableNames is null");
Preconditions.checkState(!tableNames.isEmpty(), "tableNames is empty");
@@ -206,6 +211,7 @@ public class MySqlExtractNode extends ExtractNode implements Metadata, InlongMet
this.incrementalSnapshotEnabled = incrementalSnapshotEnabled;
this.serverTimeZone = serverTimeZone;
this.extractMode = extractMode;
+ this.rowKindsFiltered = rowKindsFiltered;
}
@Override
@@ -225,6 +231,11 @@ public class MySqlExtractNode extends ExtractNode implements Metadata, InlongMet
options.put("connector", "mysql-cdc-inlong");
options.put("hostname", hostname);
options.put("database-name", database);
+ if (rowKindsFiltered != null) {
+ List<String> rowKinds = rowKindsFiltered.stream().map(RowKindEnum::shortString)
+ .collect(Collectors.toList());
+ options.put("row-kinds-filtered", StringUtils.join(rowKinds, "&"));
+ }
if (port != null) {
options.put("port", port.toString());
}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/filter/RowKindValidator.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/filter/RowKindValidator.java
new file mode 100644
index 000000000..14a047d82
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/filter/RowKindValidator.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.filter;
+
+import static org.apache.inlong.sort.base.Constants.DELIMITER;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Pattern;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * row kind validator, only specified row kinds can be valid
+ * supported row kinds are
+ *
+ * "+I" represents INSERT.
+ * "-U" represents UPDATE_BEFORE.
+ * "+U" represents UPDATE_AFTER.
+ * "-D" represents DELETE.
+ *
+ */
+public class RowKindValidator implements RowValidator {
+
+ private final Set<RowKind> rowKindsFiltered = new HashSet<>();
+
+ private static final String pattern = "(\\+I|\\+U|-U|-D)(&(\\+I|\\+U|-U|-D))*";
+
+ public RowKindValidator(List<String> rowKinds) {
+ Preconditions.checkArgument(!rowKinds.isEmpty(),
+ "rowKinds should not be empty");
+ for (String rowKind : rowKinds) {
+ Arrays.stream(RowKind.values()).filter(value -> value.shortString().equals(rowKind))
+ .findFirst().ifPresent(rowKindsFiltered::add);
+ }
+ }
+
+ public RowKindValidator(String rowKinds) {
+ Preconditions.checkArgument(Pattern.matches(pattern, rowKinds),
+ String.format("rowKinds is not valid, should match the pattern %s,"
+ + " the input value is %s", pattern, rowKinds));
+ for (String rowKind : rowKinds.split(DELIMITER)) {
+ Arrays.stream(RowKind.values()).filter(value -> value.shortString().equals(rowKind))
+ .findFirst().ifPresent(rowKindsFiltered::add);
+ }
+ }
+
+ @Override
+ public boolean validate(RowKind rowKind) {
+ return rowKindsFiltered.contains(rowKind);
+ }
+}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/filter/RowValidator.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/filter/RowValidator.java
new file mode 100644
index 000000000..919d6b6d4
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/filter/RowValidator.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.filter;
+
+import java.io.Serializable;
+import org.apache.flink.types.RowKind;
+
+/**
+ * Validator for RowData
+ */
+public interface RowValidator extends Serializable {
+
+ boolean validate(RowKind rowKind);
+
+}
diff --git a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/validator/TestRowValidator.java b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/validator/TestRowValidator.java
new file mode 100644
index 000000000..ed02f2fda
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/validator/TestRowValidator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.validator;
+
+import org.apache.flink.types.RowKind;
+import org.apache.inlong.sort.base.filter.RowKindValidator;
+import org.apache.inlong.sort.base.filter.RowValidator;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test row kind validator
+ */
+public class TestRowValidator {
+
+ @Test
+ public void testRowKindValidator() {
+ RowValidator rowKindValidator = new RowKindValidator("+I&+U&-U");
+ Assert.assertTrue(rowKindValidator.validate(RowKind.INSERT));
+ Assert.assertFalse(rowKindValidator.validate(RowKind.DELETE));
+ Assert.assertTrue(rowKindValidator.validate(RowKind.UPDATE_AFTER));
+ Assert.assertTrue(rowKindValidator.validate(RowKind.UPDATE_BEFORE));
+ }
+
+ @Test
+ public void testRowKindsString() {
+ Assert.assertThrows(IllegalArgumentException.class, () -> new RowKindValidator("+I&+U&-"));
+ Assert.assertThrows(IllegalArgumentException.class, () -> new RowKindValidator("+I&-I"));
+ Assert.assertThrows(IllegalArgumentException.class, () -> new RowKindValidator("+I&+U&"));
+ }
+
+}
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
index a6f5c8da6..63b0bb019 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
@@ -40,6 +40,7 @@ import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
+import org.apache.inlong.sort.base.filter.RowValidator;
import org.apache.inlong.sort.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.inlong.sort.cdc.debezium.utils.TemporalConversions;
import org.apache.kafka.connect.data.ConnectSchema;
@@ -51,7 +52,6 @@ import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.Serializable;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.time.Instant;
@@ -109,7 +109,7 @@ public final class RowDataDebeziumDeserializeSchema
/**
* Validator to validate the row value.
*/
- private final ValueValidator validator;
+ private final RowValidator rowKindValidator;
private boolean migrateAll;
@@ -119,7 +119,7 @@ public final class RowDataDebeziumDeserializeSchema
RowType physicalDataType,
MetadataConverter[] metadataConverters,
TypeInformation<RowData> resultTypeInfo,
- ValueValidator validator,
+ RowValidator rowValidator,
ZoneId serverTimeZone,
boolean appendSource,
DeserializationRuntimeConverterFactory userDefinedConverterFactory,
@@ -134,7 +134,7 @@ public final class RowDataDebeziumDeserializeSchema
serverTimeZone,
userDefinedConverterFactory);
this.resultTypeInfo = checkNotNull(resultTypeInfo);
- this.validator = checkNotNull(validator);
+ this.rowKindValidator = rowValidator;
this.appendSource = checkNotNull(appendSource);
}
@@ -623,6 +623,9 @@ public final class RowDataDebeziumDeserializeSchema
* @return
*/
private Object getTimeValue(Object fieldValue, String schemaName) {
+ if (fieldValue == null) {
+ return null;
+ }
switch (schemaName) {
case MicroTime.SCHEMA_NAME:
Instant instant = Instant.ofEpochMilli((Long) fieldValue / 1000);
@@ -661,24 +664,20 @@ public final class RowDataDebeziumDeserializeSchema
Schema valueSchema = record.valueSchema();
if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
GenericRowData insert = extractAfterRow(value, valueSchema);
- validator.validate(insert, RowKind.INSERT);
insert.setRowKind(RowKind.INSERT);
emit(record, insert, tableSchema, out);
} else if (op == Envelope.Operation.DELETE) {
GenericRowData delete = extractBeforeRow(value, valueSchema);
- validator.validate(delete, RowKind.DELETE);
delete.setRowKind(RowKind.DELETE);
emit(record, delete, tableSchema, out);
} else {
if (!appendSource) {
GenericRowData before = extractBeforeRow(value, valueSchema);
- validator.validate(before, RowKind.UPDATE_BEFORE);
before.setRowKind(RowKind.UPDATE_BEFORE);
emit(record, before, tableSchema, out);
}
GenericRowData after = extractAfterRow(value, valueSchema);
- validator.validate(after, RowKind.UPDATE_AFTER);
after.setRowKind(RowKind.UPDATE_AFTER);
emit(record, after, tableSchema, out);
}
@@ -699,6 +698,9 @@ public final class RowDataDebeziumDeserializeSchema
private void emit(SourceRecord inRecord, RowData physicalRow,
TableChange tableChange, Collector<RowData> collector
) {
+ if (!rowKindValidator.validate(physicalRow.getRowKind())) {
+ return;
+ }
if (appendSource) {
physicalRow.setRowKind(RowKind.INSERT);
}
@@ -717,14 +719,6 @@ public final class RowDataDebeziumDeserializeSchema
return resultTypeInfo;
}
- /**
- * Custom validator to validate the row value.
- */
- public interface ValueValidator extends Serializable {
-
- void validate(RowData rowData, RowKind rowKind) throws Exception;
- }
-
/**
* Builder of {@link RowDataDebeziumDeserializeSchema}.
*/
@@ -733,8 +727,7 @@ public final class RowDataDebeziumDeserializeSchema
private RowType physicalRowType;
private TypeInformation<RowData> resultTypeInfo;
private MetadataConverter[] metadataConverters = new MetadataConverter[0];
- private ValueValidator validator = (rowData, rowKind) -> {
- };
+ private RowValidator rowValidator;
private ZoneId serverTimeZone = ZoneId.of("UTC");
private boolean appendSource = false;
private boolean migrateAll = false;
@@ -761,8 +754,8 @@ public final class RowDataDebeziumDeserializeSchema
return this;
}
- public Builder setValueValidator(ValueValidator validator) {
- this.validator = validator;
+ public Builder setValidator(RowValidator rowFilter) {
+ this.rowValidator = rowFilter;
return this;
}
@@ -787,7 +780,7 @@ public final class RowDataDebeziumDeserializeSchema
physicalRowType,
metadataConverters,
resultTypeInfo,
- validator,
+ rowValidator,
serverTimeZone,
appendSource,
userDefinedConverterFactory,
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
index 8a211cf19..5093583fe 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
@@ -184,6 +184,17 @@ public class MySqlSourceOptions {
"Optional interval of sending heartbeat event for tracing the "
+ "latest available binlog offsets");
+ public static final ConfigOption<String> ROW_KINDS_FILTERED =
+ ConfigOptions.key("row-kinds-filtered")
+ .stringType()
+ .defaultValue("+I&-U&+U&-D")
+ .withDescription("row kinds to be filtered,"
+ + " here filtered means keep the data of certain row kind"
+ + "the format follows rowKind1&rowKind2, supported row kinds are "
+ + "\"+I\" represents INSERT.\n"
+ + "\"-U\" represents UPDATE_BEFORE.\n"
+ + "\"+U\" represents UPDATE_AFTER.\n"
+ + "\"-D\" represents DELETE.");
public static final ConfigOption<Boolean> APPEND_MODE =
ConfigOptions.key("append-mode")
.booleanType()
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
index 9af1bd735..5f79b4980 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
@@ -23,6 +23,7 @@ import io.debezium.data.Envelope;
import io.debezium.data.Envelope.FieldName;
import io.debezium.relational.Table;
import io.debezium.relational.history.TableChanges;
+import java.util.LinkedHashMap;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericArrayData;
@@ -412,7 +413,7 @@ public enum MySqlReadableMetadata {
if (tableSchema == null) {
return null;
}
- Map<String, String> mysqlType = new HashMap<>();
+ Map<String, String> mysqlType = new LinkedHashMap<>();
final Table table = tableSchema.getTable();
table.columns()
.forEach(
@@ -436,7 +437,7 @@ public enum MySqlReadableMetadata {
if (tableSchema == null) {
return null;
}
- Map<String, Integer> sqlType = new HashMap<>();
+ Map<String, Integer> sqlType = new LinkedHashMap<>();
final Table table = tableSchema.getTable();
table.columns().forEach(
column -> sqlType.put(column.name(), column.jdbcType())
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
index b4610d959..ac4441362 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
@@ -51,6 +51,7 @@ import static org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.
import static org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.MIGRATE_ALL;
import static org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.PASSWORD;
import static org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.PORT;
+import static org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.ROW_KINDS_FILTERED;
import static org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
import static org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
@@ -148,7 +149,8 @@ public class MySqlTableInlongSourceFactory implements DynamicTableSourceFactory
double distributionFactorLower = config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
boolean scanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
Duration heartbeatInterval = config.get(HEARTBEAT_INTERVAL);
-
+ final String rowKindFiltered = config.get(ROW_KINDS_FILTERED).isEmpty()
+ ? ROW_KINDS_FILTERED.defaultValue() : config.get(ROW_KINDS_FILTERED);
boolean enableParallelRead = config.get(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
if (enableParallelRead) {
validatePrimaryKeyIfEnableParallel(physicalSchema);
@@ -189,7 +191,7 @@ public class MySqlTableInlongSourceFactory implements DynamicTableSourceFactory
heartbeatInterval,
migrateAll,
inlongMetric,
- inlongAudit);
+ inlongAudit, rowKindFiltered);
}
@Override
@@ -233,6 +235,7 @@ public class MySqlTableInlongSourceFactory implements DynamicTableSourceFactory
options.add(HEARTBEAT_INTERVAL);
options.add(INLONG_METRIC);
options.add(INLONG_AUDIT);
+ options.add(ROW_KINDS_FILTERED);
return options;
}
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
index 621f122f6..9b6ca23e7 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
@@ -34,6 +34,7 @@ import org.apache.inlong.sort.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.inlong.sort.cdc.debezium.DebeziumSourceFunction;
import org.apache.inlong.sort.cdc.debezium.table.MetadataConverter;
import org.apache.inlong.sort.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
+import org.apache.inlong.sort.base.filter.RowKindValidator;
import org.apache.inlong.sort.cdc.mysql.source.MySqlSource;
import javax.annotation.Nullable;
@@ -78,6 +79,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
private final StartupOptions startupOptions;
private final boolean appendSource;
private final boolean scanNewlyAddedTableEnabled;
+ private final String rowKindsFiltered;
private final Properties jdbcProperties;
private final Duration heartbeatInterval;
private final boolean migrateAll;
@@ -125,7 +127,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
Duration heartbeatInterval,
boolean migrateAll,
String inlongMetric,
- String inlongAudit) {
+ String inlongAudit,
+ String rowKindsFiltered) {
this(
physicalSchema,
port,
@@ -153,7 +156,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
heartbeatInterval,
migrateAll,
inlongMetric,
- inlongAudit);
+ inlongAudit,
+ rowKindsFiltered);
}
/**
@@ -186,7 +190,9 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
Duration heartbeatInterval,
boolean migrateAll,
String inlongMetric,
- String inlongAudit) {
+ String inlongAudit,
+ String rowKindsFiltered
+ ) {
this.physicalSchema = physicalSchema;
this.port = port;
this.hostname = checkNotNull(hostname);
@@ -217,6 +223,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
this.migrateAll = migrateAll;
this.inlongMetric = inlongMetric;
this.inlongAudit = inlongAudit;
+ this.rowKindsFiltered = rowKindsFiltered;
}
@Override
@@ -246,8 +253,9 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
.setResultTypeInfo(typeInfo)
.setServerTimeZone(serverTimeZone)
.setAppendSource(appendSource)
+ .setValidator(new RowKindValidator(rowKindsFiltered))
.setUserDefinedConverterFactory(
- MySqlDeserializationConverterFactory.instance())
+ MySqlDeserializationConverterFactory.instance())
.setMigrateAll(migrateAll)
.build();
if (enableParallelRead) {
@@ -256,7 +264,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
.hostname(hostname)
.port(port)
.databaseList(database)
- .tableList(database + "." + tableName)
+ .tableList(tableName)
.username(username)
.password(password)
.serverTimeZone(serverTimeZone.toString())
@@ -285,7 +293,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
.hostname(hostname)
.port(port)
.databaseList(database)
- .tableList(database + "." + tableName)
+ .tableList(tableName)
.username(username)
.password(password)
.serverTimeZone(serverTimeZone.toString())
@@ -366,7 +374,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
heartbeatInterval,
migrateAll,
inlongMetric,
- inlongAudit);
+ inlongAudit, rowKindsFiltered);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
index bd3501a61..2e4f45e6d 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
@@ -17,6 +17,7 @@
package org.apache.inlong.sort.parser;
+import java.util.ArrayList;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
@@ -29,6 +30,7 @@ import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.GroupInfo;
import org.apache.inlong.sort.protocol.MetaFieldInfo;
import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.enums.ExtractMode;
import org.apache.inlong.sort.protocol.node.Node;
import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
import org.apache.inlong.sort.protocol.node.format.CsvFormat;
@@ -48,17 +50,22 @@ import java.util.stream.Collectors;
public class AllMigrateTest {
private MySqlExtractNode buildAllMigrateExtractNode() {
- List<FieldInfo> fields = Collections.singletonList(
- new MetaFieldInfo("data", MetaField.DATA));
+
Map<String, String> option = new HashMap<>();
option.put("append-mode", "true");
option.put("migrate-all", "true");
+ List<String> tables = new ArrayList(10);
+ tables.add("test.*");
+ List<FieldInfo> fields = Collections.singletonList(
+ new MetaFieldInfo("data", MetaField.DATA));
+
return new MySqlExtractNode("1", "mysql_input", fields,
null, option, null,
- Collections.singletonList("test"), "localhost", "root", "inlong",
- "test", null, null, false, null);
+ tables, "localhost", "root", "inlong",
+ "test", null, null, false, null,
+ ExtractMode.CDC, null,null);
}
-
+
private MySqlExtractNode buildAllMigrateExtractNodeWithBytesFormat() {
List<FieldInfo> fields = Collections.singletonList(
new MetaFieldInfo("data", MetaField.DATA_BYTES));
@@ -68,9 +75,10 @@ public class AllMigrateTest {
return new MySqlExtractNode("1", "mysql_input", fields,
null, option, null,
Collections.singletonList("test"), "localhost", "root", "inlong",
- "test", null, null, false, null);
+ "test", null, null, false, "UTC-8", ExtractMode.CDC,
+ null, null);
}
-
+
private KafkaLoadNode buildAllMigrateKafkaNodeWithBytesFormat() {
List<FieldInfo> fields = Collections.singletonList(
new FieldInfo("data", new VarBinaryFormatInfo()));
@@ -84,7 +92,7 @@ public class AllMigrateTest {
csvFormat, null,
null, null);
}
-
+
private KafkaLoadNode buildAllMigrateKafkaNode() {
List<FieldInfo> fields = Collections.singletonList(
new FieldInfo("data", new StringFormatInfo()));