You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/10/19 12:34:58 UTC

[inlong] branch master updated: [INLONG-6152][Sort] MySQL connector support filtering kinds of row data (#6173)

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new bedbb5106 [INLONG-6152][Sort] MySQL connector support filtering kinds of row data (#6173)
bedbb5106 is described below

commit bedbb510641dd0e383954c5282ec656e79f81c47
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Wed Oct 19 20:34:52 2022 +0800

    [INLONG-6152][Sort] MySQL connector support filtering kinds of row data (#6173)
---
 .../apache/inlong/common/enums/RowKindEnum.java    | 108 +++++++++++++++++++++
 .../protocol/node/extract/MySqlExtractNode.java    |  17 +++-
 .../inlong/sort/base/filter/RowKindValidator.java  |  70 +++++++++++++
 .../inlong/sort/base/filter/RowValidator.java      |  31 ++++++
 .../sort/base/validator/TestRowValidator.java      |  48 +++++++++
 .../table/RowDataDebeziumDeserializeSchema.java    |  35 +++----
 .../mysql/source/config/MySqlSourceOptions.java    |  11 +++
 .../cdc/mysql/table/MySqlReadableMetadata.java     |   5 +-
 .../mysql/table/MySqlTableInlongSourceFactory.java |   7 +-
 .../sort/cdc/mysql/table/MySqlTableSource.java     |  22 +++--
 .../apache/inlong/sort/parser/AllMigrateTest.java  |  24 +++--
 11 files changed, 335 insertions(+), 43 deletions(-)

diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/RowKindEnum.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/RowKindEnum.java
new file mode 100644
index 000000000..296c86807
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/RowKindEnum.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.common.enums;
+
+/**
+ * Row kind provided by flink
+ */
+public enum RowKindEnum {
+
+    /** Insertion operation. */
+    INSERT("+I", (byte) 0),
+
+    /**
+     * Update operation with the previous content of the updated row.
+     */
+    UPDATE_BEFORE("-U", (byte) 1),
+
+    /**
+     * Update operation with new content of the updated row.
+     *
+     */
+    UPDATE_AFTER("+U", (byte) 2),
+
+    /** Deletion operation. */
+    DELETE("-D", (byte) 3);
+
+    private final String shortString;
+
+    private final byte value;
+
+    /**
+     * Creates a {@link RowKindEnum} enum with the given short string and byte value representation of
+     * the {@link RowKindEnum}.
+     */
+    RowKindEnum(String shortString, byte value) {
+        this.shortString = shortString;
+        this.value = value;
+    }
+
+    /**
+     * Returns a short string representation of this {@link RowKindEnum}.
+     *
+     * <ul>
+     *   <li>"+I" represents {@link #INSERT}.
+     *   <li>"-U" represents {@link #UPDATE_BEFORE}.
+     *   <li>"+U" represents {@link #UPDATE_AFTER}.
+     *   <li>"-D" represents {@link #DELETE}.
+     * </ul>
+     */
+    public String shortString() {
+        return shortString;
+    }
+
+    /**
+     * Returns the byte value representation of this {@link RowKindEnum}. The byte value is used for
+     * serialization and deserialization.
+     *
+     * <p>
+     *
+     * <ul>
+     *   <li>"0" represents {@link #INSERT}.
+     *   <li>"1" represents {@link #UPDATE_BEFORE}.
+     *   <li>"2" represents {@link #UPDATE_AFTER}.
+     *   <li>"3" represents {@link #DELETE}.
+     * </ul>
+     */
+    public byte toByteValue() {
+        return value;
+    }
+
+    /**
+     * Creates a {@link RowKindEnum} from the given byte value. Each {@link RowKindEnum} has a byte value
+     * representation.
+     *
+     * @see #toByteValue() for mapping of byte value and {@link RowKindEnum}.
+     */
+    public static RowKindEnum fromByteValue(byte value) {
+        switch (value) {
+            case 0:
+                return INSERT;
+            case 1:
+                return UPDATE_BEFORE;
+            case 2:
+                return UPDATE_AFTER;
+            case 3:
+                return DELETE;
+            default:
+                throw new UnsupportedOperationException(
+                    "Unsupported byte value '" + value + "' for row kind.");
+        }
+    }
+
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MySqlExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MySqlExtractNode.java
index 1f590a3ab..e561610b8 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MySqlExtractNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MySqlExtractNode.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sort.protocol.node.extract;
 
 import com.google.common.base.Preconditions;
+import java.util.stream.Collectors;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import org.apache.commons.lang3.StringUtils;
@@ -27,6 +28,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInc
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.inlong.common.enums.MetaField;
+import org.apache.inlong.common.enums.RowKindEnum;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.InlongMetric;
 import org.apache.inlong.sort.protocol.Metadata;
@@ -79,6 +81,8 @@ public class MySqlExtractNode extends ExtractNode implements Metadata, InlongMet
     private ExtractMode extractMode;
     @JsonProperty("url")
     private String url;
+    @JsonProperty("rowKindsFiltered")
+    private List<RowKindEnum> rowKindsFiltered;
 
     /**
      * Constructor only used for {@link ExtractMode#CDC}
@@ -116,7 +120,7 @@ public class MySqlExtractNode extends ExtractNode implements Metadata, InlongMet
             @Nullable @JsonProperty("incrementalSnapshotEnabled") Boolean incrementalSnapshotEnabled,
             @Nullable @JsonProperty("serverTimeZone") String serverTimeZone) {
         this(id, name, fields, watermarkField, properties, primaryKey, tableNames, hostname, username, password,
-                database, port, serverId, incrementalSnapshotEnabled, serverTimeZone, ExtractMode.CDC, null);
+                database, port, serverId, incrementalSnapshotEnabled, serverTimeZone, ExtractMode.CDC, null, null);
     }
 
     /**
@@ -143,7 +147,7 @@ public class MySqlExtractNode extends ExtractNode implements Metadata, InlongMet
             @Nullable @JsonProperty("url") String url) {
         this(id, name, fields, null, properties, primaryKey, tableNames, null, username,
                 password, null, null, null, null, null,
-                ExtractMode.SCAN, url);
+                ExtractMode.SCAN, url, null);
     }
 
     /**
@@ -185,7 +189,8 @@ public class MySqlExtractNode extends ExtractNode implements Metadata, InlongMet
             @Nullable @JsonProperty("incrementalSnapshotEnabled") Boolean incrementalSnapshotEnabled,
             @Nullable @JsonProperty("serverTimeZone") String serverTimeZone,
             @Nonnull @JsonProperty("extractMode") ExtractMode extractMode,
-            @Nullable @JsonProperty("url") String url) {
+            @Nullable @JsonProperty("url") String url,
+            @Nullable @JsonProperty("rowKindsFiltered") List<RowKindEnum> rowKindsFiltered) {
         super(id, name, fields, watermarkField, properties);
         this.tableNames = Preconditions.checkNotNull(tableNames, "tableNames is null");
         Preconditions.checkState(!tableNames.isEmpty(), "tableNames is empty");
@@ -206,6 +211,7 @@ public class MySqlExtractNode extends ExtractNode implements Metadata, InlongMet
         this.incrementalSnapshotEnabled = incrementalSnapshotEnabled;
         this.serverTimeZone = serverTimeZone;
         this.extractMode = extractMode;
+        this.rowKindsFiltered = rowKindsFiltered;
     }
 
     @Override
@@ -225,6 +231,11 @@ public class MySqlExtractNode extends ExtractNode implements Metadata, InlongMet
             options.put("connector", "mysql-cdc-inlong");
             options.put("hostname", hostname);
             options.put("database-name", database);
+            if (rowKindsFiltered != null) {
+                List<String> rowKinds = rowKindsFiltered.stream().map(RowKindEnum::shortString)
+                    .collect(Collectors.toList());
+                options.put("row-kinds-filtered", StringUtils.join(rowKinds, "&"));
+            }
             if (port != null) {
                 options.put("port", port.toString());
             }
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/filter/RowKindValidator.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/filter/RowKindValidator.java
new file mode 100644
index 000000000..14a047d82
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/filter/RowKindValidator.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.filter;
+
+import static org.apache.inlong.sort.base.Constants.DELIMITER;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Pattern;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * row kind validator, only specified row kinds can be valid
+ * supported row kinds are
+ *
+ * "+I" represents INSERT.
+ * "-U" represents UPDATE_BEFORE.
+ * "+U" represents UPDATE_AFTER.
+ * "-D" represents DELETE.
+ *
+ */
+public class RowKindValidator implements RowValidator {
+
+    private final Set<RowKind> rowKindsFiltered = new HashSet<>();
+
+    private static final String pattern = "(\\+I|\\+U|-U|-D)(&(\\+I|\\+U|-U|-D))*";
+
+    public RowKindValidator(List<String> rowKinds) {
+        Preconditions.checkArgument(!rowKinds.isEmpty(),
+            "rowKinds should not be empty");
+        for (String rowKind : rowKinds) {
+            Arrays.stream(RowKind.values()).filter(value -> value.shortString().equals(rowKind))
+                .findFirst().ifPresent(rowKindsFiltered::add);
+        }
+    }
+
+    public RowKindValidator(String rowKinds) {
+        Preconditions.checkArgument(Pattern.matches(pattern, rowKinds),
+             String.format("rowKinds is not valid, should match the pattern %s,"
+                 + " the input value is %s", pattern, rowKinds));
+        for (String rowKind : rowKinds.split(DELIMITER)) {
+            Arrays.stream(RowKind.values()).filter(value -> value.shortString().equals(rowKind))
+                .findFirst().ifPresent(rowKindsFiltered::add);
+        }
+    }
+
+    @Override
+    public boolean validate(RowKind rowKind) {
+        return rowKindsFiltered.contains(rowKind);
+    }
+}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/filter/RowValidator.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/filter/RowValidator.java
new file mode 100644
index 000000000..919d6b6d4
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/filter/RowValidator.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.filter;
+
+import java.io.Serializable;
+import org.apache.flink.types.RowKind;
+
+/**
+ * Validator for RowData
+ */
+public interface RowValidator extends Serializable {
+
+    boolean validate(RowKind rowKind);
+
+}
diff --git a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/validator/TestRowValidator.java b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/validator/TestRowValidator.java
new file mode 100644
index 000000000..ed02f2fda
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/validator/TestRowValidator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.validator;
+
+import org.apache.flink.types.RowKind;
+import org.apache.inlong.sort.base.filter.RowKindValidator;
+import org.apache.inlong.sort.base.filter.RowValidator;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test row kind validator
+ */
+public class TestRowValidator {
+
+    @Test
+    public void testRowKindValidator() {
+        RowValidator rowKindValidator = new RowKindValidator("+I&+U&-U");
+        Assert.assertTrue(rowKindValidator.validate(RowKind.INSERT));
+        Assert.assertFalse(rowKindValidator.validate(RowKind.DELETE));
+        Assert.assertTrue(rowKindValidator.validate(RowKind.UPDATE_AFTER));
+        Assert.assertTrue(rowKindValidator.validate(RowKind.UPDATE_BEFORE));
+    }
+
+    @Test
+    public void testRowKindsString() {
+        Assert.assertThrows(IllegalArgumentException.class, () -> new RowKindValidator("+I&+U&-"));
+        Assert.assertThrows(IllegalArgumentException.class, () -> new RowKindValidator("+I&-I"));
+        Assert.assertThrows(IllegalArgumentException.class, () -> new RowKindValidator("+I&+U&"));
+    }
+
+}
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
index a6f5c8da6..63b0bb019 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
@@ -40,6 +40,7 @@ import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Collector;
+import org.apache.inlong.sort.base.filter.RowValidator;
 import org.apache.inlong.sort.cdc.debezium.DebeziumDeserializationSchema;
 import org.apache.inlong.sort.cdc.debezium.utils.TemporalConversions;
 import org.apache.kafka.connect.data.ConnectSchema;
@@ -51,7 +52,6 @@ import org.apache.kafka.connect.source.SourceRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Serializable;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.time.Instant;
@@ -109,7 +109,7 @@ public final class RowDataDebeziumDeserializeSchema
     /**
      * Validator to validate the row value.
      */
-    private final ValueValidator validator;
+    private final RowValidator rowKindValidator;
 
     private boolean migrateAll;
 
@@ -119,7 +119,7 @@ public final class RowDataDebeziumDeserializeSchema
             RowType physicalDataType,
             MetadataConverter[] metadataConverters,
             TypeInformation<RowData> resultTypeInfo,
-            ValueValidator validator,
+            RowValidator rowValidator,
             ZoneId serverTimeZone,
             boolean appendSource,
             DeserializationRuntimeConverterFactory userDefinedConverterFactory,
@@ -134,7 +134,7 @@ public final class RowDataDebeziumDeserializeSchema
                         serverTimeZone,
                         userDefinedConverterFactory);
         this.resultTypeInfo = checkNotNull(resultTypeInfo);
-        this.validator = checkNotNull(validator);
+        this.rowKindValidator = rowValidator;
         this.appendSource = checkNotNull(appendSource);
     }
 
@@ -623,6 +623,9 @@ public final class RowDataDebeziumDeserializeSchema
      * @return
      */
     private Object getTimeValue(Object fieldValue, String schemaName) {
+        if (fieldValue == null) {
+            return null;
+        }
         switch (schemaName) {
             case MicroTime.SCHEMA_NAME:
                 Instant instant = Instant.ofEpochMilli((Long) fieldValue / 1000);
@@ -661,24 +664,20 @@ public final class RowDataDebeziumDeserializeSchema
         Schema valueSchema = record.valueSchema();
         if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
             GenericRowData insert = extractAfterRow(value, valueSchema);
-            validator.validate(insert, RowKind.INSERT);
             insert.setRowKind(RowKind.INSERT);
             emit(record, insert, tableSchema, out);
         } else if (op == Envelope.Operation.DELETE) {
             GenericRowData delete = extractBeforeRow(value, valueSchema);
-            validator.validate(delete, RowKind.DELETE);
             delete.setRowKind(RowKind.DELETE);
             emit(record, delete, tableSchema, out);
         } else {
             if (!appendSource) {
                 GenericRowData before = extractBeforeRow(value, valueSchema);
-                validator.validate(before, RowKind.UPDATE_BEFORE);
                 before.setRowKind(RowKind.UPDATE_BEFORE);
                 emit(record, before, tableSchema, out);
             }
 
             GenericRowData after = extractAfterRow(value, valueSchema);
-            validator.validate(after, RowKind.UPDATE_AFTER);
             after.setRowKind(RowKind.UPDATE_AFTER);
             emit(record, after, tableSchema, out);
         }
@@ -699,6 +698,9 @@ public final class RowDataDebeziumDeserializeSchema
     private void emit(SourceRecord inRecord, RowData physicalRow,
                       TableChange tableChange, Collector<RowData> collector
     ) {
+        if (!rowKindValidator.validate(physicalRow.getRowKind())) {
+            return;
+        }
         if (appendSource) {
             physicalRow.setRowKind(RowKind.INSERT);
         }
@@ -717,14 +719,6 @@ public final class RowDataDebeziumDeserializeSchema
         return resultTypeInfo;
     }
 
-    /**
-     * Custom validator to validate the row value.
-     */
-    public interface ValueValidator extends Serializable {
-
-        void validate(RowData rowData, RowKind rowKind) throws Exception;
-    }
-
     /**
      * Builder of {@link RowDataDebeziumDeserializeSchema}.
      */
@@ -733,8 +727,7 @@ public final class RowDataDebeziumDeserializeSchema
         private RowType physicalRowType;
         private TypeInformation<RowData> resultTypeInfo;
         private MetadataConverter[] metadataConverters = new MetadataConverter[0];
-        private ValueValidator validator = (rowData, rowKind) -> {
-        };
+        private RowValidator rowValidator;
         private ZoneId serverTimeZone = ZoneId.of("UTC");
         private boolean appendSource = false;
         private boolean migrateAll = false;
@@ -761,8 +754,8 @@ public final class RowDataDebeziumDeserializeSchema
             return this;
         }
 
-        public Builder setValueValidator(ValueValidator validator) {
-            this.validator = validator;
+        public Builder setValidator(RowValidator rowFilter) {
+            this.rowValidator = rowFilter;
             return this;
         }
 
@@ -787,7 +780,7 @@ public final class RowDataDebeziumDeserializeSchema
                     physicalRowType,
                     metadataConverters,
                     resultTypeInfo,
-                    validator,
+                    rowValidator,
                     serverTimeZone,
                     appendSource,
                     userDefinedConverterFactory,
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
index 8a211cf19..5093583fe 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
@@ -184,6 +184,17 @@ public class MySqlSourceOptions {
                             "Optional interval of sending heartbeat event for tracing the "
                                     + "latest available binlog offsets");
 
+    public static final ConfigOption<String> ROW_KINDS_FILTERED =
+        ConfigOptions.key("row-kinds-filtered")
+            .stringType()
+            .defaultValue("+I&-U&+U&-D")
+            .withDescription("row kinds to be filtered,"
+                + " here filtered means keep the data of certain row kind"
+                + "the format follows rowKind1&rowKind2, supported row kinds are "
+                + "\"+I\" represents INSERT.\n"
+                + "\"-U\" represents UPDATE_BEFORE.\n"
+                + "\"+U\" represents UPDATE_AFTER.\n"
+                + "\"-D\" represents DELETE.");
     public static final ConfigOption<Boolean> APPEND_MODE =
             ConfigOptions.key("append-mode")
                     .booleanType()
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
index 9af1bd735..5f79b4980 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
@@ -23,6 +23,7 @@ import io.debezium.data.Envelope;
 import io.debezium.data.Envelope.FieldName;
 import io.debezium.relational.Table;
 import io.debezium.relational.history.TableChanges;
+import java.util.LinkedHashMap;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.GenericArrayData;
@@ -412,7 +413,7 @@ public enum MySqlReadableMetadata {
         if (tableSchema == null) {
             return null;
         }
-        Map<String, String> mysqlType = new HashMap<>();
+        Map<String, String> mysqlType = new LinkedHashMap<>();
         final Table table = tableSchema.getTable();
         table.columns()
                 .forEach(
@@ -436,7 +437,7 @@ public enum MySqlReadableMetadata {
         if (tableSchema == null) {
             return null;
         }
-        Map<String, Integer> sqlType = new HashMap<>();
+        Map<String, Integer> sqlType = new LinkedHashMap<>();
         final Table table = tableSchema.getTable();
         table.columns().forEach(
                 column -> sqlType.put(column.name(), column.jdbcType())
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
index b4610d959..ac4441362 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
@@ -51,6 +51,7 @@ import static org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.
 import static org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.MIGRATE_ALL;
 import static org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.PASSWORD;
 import static org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.PORT;
+import static org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.ROW_KINDS_FILTERED;
 import static org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
 import static org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
 import static org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
@@ -148,7 +149,8 @@ public class MySqlTableInlongSourceFactory implements DynamicTableSourceFactory
         double distributionFactorLower = config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
         boolean scanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
         Duration heartbeatInterval = config.get(HEARTBEAT_INTERVAL);
-
+        final String rowKindFiltered = config.get(ROW_KINDS_FILTERED).isEmpty()
+            ? ROW_KINDS_FILTERED.defaultValue() : config.get(ROW_KINDS_FILTERED);
         boolean enableParallelRead = config.get(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
         if (enableParallelRead) {
             validatePrimaryKeyIfEnableParallel(physicalSchema);
@@ -189,7 +191,7 @@ public class MySqlTableInlongSourceFactory implements DynamicTableSourceFactory
                 heartbeatInterval,
                 migrateAll,
                 inlongMetric,
-                inlongAudit);
+                inlongAudit, rowKindFiltered);
     }
 
     @Override
@@ -233,6 +235,7 @@ public class MySqlTableInlongSourceFactory implements DynamicTableSourceFactory
         options.add(HEARTBEAT_INTERVAL);
         options.add(INLONG_METRIC);
         options.add(INLONG_AUDIT);
+        options.add(ROW_KINDS_FILTERED);
         return options;
     }
 
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
index 621f122f6..9b6ca23e7 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
@@ -34,6 +34,7 @@ import org.apache.inlong.sort.cdc.debezium.DebeziumDeserializationSchema;
 import org.apache.inlong.sort.cdc.debezium.DebeziumSourceFunction;
 import org.apache.inlong.sort.cdc.debezium.table.MetadataConverter;
 import org.apache.inlong.sort.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
+import org.apache.inlong.sort.base.filter.RowKindValidator;
 import org.apache.inlong.sort.cdc.mysql.source.MySqlSource;
 
 import javax.annotation.Nullable;
@@ -78,6 +79,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
     private final StartupOptions startupOptions;
     private final boolean appendSource;
     private final boolean scanNewlyAddedTableEnabled;
+    private final String rowKindsFiltered;
     private final Properties jdbcProperties;
     private final Duration heartbeatInterval;
     private final boolean migrateAll;
@@ -125,7 +127,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
             Duration heartbeatInterval,
             boolean migrateAll,
             String inlongMetric,
-            String inlongAudit) {
+            String inlongAudit,
+            String rowKindsFiltered) {
         this(
                 physicalSchema,
                 port,
@@ -153,7 +156,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
                 heartbeatInterval,
                 migrateAll,
                 inlongMetric,
-                inlongAudit);
+                inlongAudit,
+                rowKindsFiltered);
     }
 
     /**
@@ -186,7 +190,9 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
             Duration heartbeatInterval,
             boolean migrateAll,
             String inlongMetric,
-            String inlongAudit) {
+            String inlongAudit,
+            String rowKindsFiltered
+            ) {
         this.physicalSchema = physicalSchema;
         this.port = port;
         this.hostname = checkNotNull(hostname);
@@ -217,6 +223,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
         this.migrateAll = migrateAll;
         this.inlongMetric = inlongMetric;
         this.inlongAudit = inlongAudit;
+        this.rowKindsFiltered = rowKindsFiltered;
     }
 
     @Override
@@ -246,8 +253,9 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
                         .setResultTypeInfo(typeInfo)
                         .setServerTimeZone(serverTimeZone)
                         .setAppendSource(appendSource)
+                        .setValidator(new RowKindValidator(rowKindsFiltered))
                         .setUserDefinedConverterFactory(
-                                MySqlDeserializationConverterFactory.instance())
+                            MySqlDeserializationConverterFactory.instance())
                         .setMigrateAll(migrateAll)
                         .build();
         if (enableParallelRead) {
@@ -256,7 +264,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
                             .hostname(hostname)
                             .port(port)
                             .databaseList(database)
-                            .tableList(database + "." + tableName)
+                            .tableList(tableName)
                             .username(username)
                             .password(password)
                             .serverTimeZone(serverTimeZone.toString())
@@ -285,7 +293,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
                             .hostname(hostname)
                             .port(port)
                             .databaseList(database)
-                            .tableList(database + "." + tableName)
+                            .tableList(tableName)
                             .username(username)
                             .password(password)
                             .serverTimeZone(serverTimeZone.toString())
@@ -366,7 +374,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
                         heartbeatInterval,
                         migrateAll,
                         inlongMetric,
-                        inlongAudit);
+                        inlongAudit, rowKindsFiltered);
         source.metadataKeys = metadataKeys;
         source.producedDataType = producedDataType;
         return source;
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
index bd3501a61..2e4f45e6d 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.sort.parser;
 
+import java.util.ArrayList;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
@@ -29,6 +30,7 @@ import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.GroupInfo;
 import org.apache.inlong.sort.protocol.MetaFieldInfo;
 import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.enums.ExtractMode;
 import org.apache.inlong.sort.protocol.node.Node;
 import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
 import org.apache.inlong.sort.protocol.node.format.CsvFormat;
@@ -48,17 +50,22 @@ import java.util.stream.Collectors;
 public class AllMigrateTest {
 
     private MySqlExtractNode buildAllMigrateExtractNode() {
-        List<FieldInfo> fields = Collections.singletonList(
-            new MetaFieldInfo("data", MetaField.DATA));
+
         Map<String, String> option = new HashMap<>();
         option.put("append-mode", "true");
         option.put("migrate-all", "true");
+        List<String> tables = new ArrayList(10);
+        tables.add("test.*");
+        List<FieldInfo> fields = Collections.singletonList(
+            new MetaFieldInfo("data", MetaField.DATA));
+
         return new MySqlExtractNode("1", "mysql_input", fields,
                 null, option, null,
-            Collections.singletonList("test"), "localhost", "root", "inlong",
-                "test", null, null, false, null);
+            tables, "localhost", "root", "inlong",
+                "test", null, null, false, null,
+            ExtractMode.CDC, null,null);
     }
-    
+
     private MySqlExtractNode buildAllMigrateExtractNodeWithBytesFormat() {
         List<FieldInfo> fields = Collections.singletonList(
             new MetaFieldInfo("data", MetaField.DATA_BYTES));
@@ -68,9 +75,10 @@ public class AllMigrateTest {
         return new MySqlExtractNode("1", "mysql_input", fields,
             null, option, null,
             Collections.singletonList("test"), "localhost", "root", "inlong",
-            "test", null, null, false, null);
+            "test", null, null, false, "UTC-8", ExtractMode.CDC,
+            null, null);
     }
-    
+
     private KafkaLoadNode buildAllMigrateKafkaNodeWithBytesFormat() {
         List<FieldInfo> fields = Collections.singletonList(
             new FieldInfo("data", new VarBinaryFormatInfo()));
@@ -84,7 +92,7 @@ public class AllMigrateTest {
                 csvFormat, null,
                 null, null);
     }
-    
+
     private KafkaLoadNode buildAllMigrateKafkaNode() {
         List<FieldInfo> fields = Collections.singletonList(
             new FieldInfo("data", new StringFormatInfo()));