You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/04/29 07:17:42 UTC

[incubator-inlong] branch master updated: [INLONG-3996][Sort] Sort support all migrate for database

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 30d6c6445 [INLONG-3996][Sort] Sort support all migrate for database
30d6c6445 is described below

commit 30d6c6445746b15fd11abd1865f90c5458f39edb
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Fri Apr 29 15:17:36 2022 +0800

    [INLONG-3996][Sort] Sort support all migrate for database
---
 .../sort/protocol/node/format/CsvFormat.java       |  12 +-
 inlong-sort/sort-formats/format-json/pom.xml       |   4 +
 .../inlong/sort/formats/json/canal/CanalJson.java  | 115 ++++++++++++++++
 .../debezium/table/AppendMetadataCollector.java    |  10 +-
 .../table/RowDataDebeziumDeserializeSchema.java    | 148 ++++++++++++++++++---
 .../mysql/source/config/MySqlSourceOptions.java    |   6 +
 .../cdc/mysql/table/MySqlReadableMetadata.java     | 148 +++++++++++++++------
 .../mysql/table/MySqlTableInlongSourceFactory.java |   6 +-
 .../flink/cdc/mysql/table/MySqlTableSource.java    |  16 ++-
 .../singletenant/flink/parser/AllMigrateTest.java  | 109 +++++++++++++++
 .../flink/parser/FlinkSqlParserTest.java           |  50 +++----
 11 files changed, 522 insertions(+), 102 deletions(-)

diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/CsvFormat.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/CsvFormat.java
index 232bda60c..60cfba67b 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/CsvFormat.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/CsvFormat.java
@@ -40,7 +40,7 @@ public class CsvFormat implements Format {
 
     @JsonProperty(value = "fieldDelimiter", defaultValue = ",")
     private String fieldDelimiter;
-    @JsonProperty(value = "disableQuoteCharacter", defaultValue = "false")
+    @JsonProperty(value = "disableQuoteCharacter", defaultValue = "true")
     private Boolean disableQuoteCharacter;
     @JsonProperty(value = "quoteCharacter", defaultValue = "\"")
     private String quoteCharacter;
@@ -76,7 +76,7 @@ public class CsvFormat implements Format {
 
     @JsonCreator
     public CsvFormat() {
-        this(",", false, "\"", false, true, ";", null, null);
+        this(",", true, null, false, true, ";", null, null);
     }
 
     @JsonIgnore
@@ -90,14 +90,16 @@ public class CsvFormat implements Format {
      *
      * @return options
      */
+    @Override
     public Map<String, String> generateOptions() {
         Map<String, String> options = new HashMap<>(16);
         options.put("format", getFormat());
         options.put("csv.field-delimiter", this.fieldDelimiter);
-        if (this.disableQuoteCharacter != null) {
-            options.put("csv.disable-quote-character", this.disableQuoteCharacter.toString());
+        options.put("csv.disable-quote-character", this.disableQuoteCharacter.toString());
+        // disable quote and quote character cannot appear at the same time
+        if (!this.disableQuoteCharacter) {
+            options.put("csv.quote-character", this.quoteCharacter);
         }
-        options.put("csv.quote-character", this.quoteCharacter);
         if (this.allowComments != null) {
             options.put("csv.allow-comments", this.allowComments.toString());
         }
diff --git a/inlong-sort/sort-formats/format-json/pom.xml b/inlong-sort/sort-formats/format-json/pom.xml
index ad346cabb..9a0846c96 100644
--- a/inlong-sort/sort-formats/format-json/pom.xml
+++ b/inlong-sort/sort-formats/format-json/pom.xml
@@ -72,6 +72,10 @@
             <artifactId>flink-table-runtime-blink_${flink.scala.binary.version}</artifactId>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+        </dependency>
 
     </dependencies>
 
diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
new file mode 100644
index 000000000..8ea272436
--- /dev/null
+++ b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.json.canal;
+
+import java.util.List;
+import java.util.Map;
+import lombok.Builder;
+
+@Builder
+public class CanalJson {
+
+    private List<Map<String, Object>> data;
+    private long es;
+    private String table;
+    private String type;
+    private String database;
+    private long ts;
+    private String sql;
+    private Map<String, String> mysqlType;
+    private boolean isDdl;
+    private List<String> pkNames;
+
+    public List<Map<String, Object>> getData() {
+        return data;
+    }
+
+    public void setData(List<Map<String, Object>> data) {
+        this.data = data;
+    }
+
+    public long getEs() {
+        return es;
+    }
+
+    public void setEs(long es) {
+        this.es = es;
+    }
+
+    public String getTable() {
+        return table;
+    }
+
+    public void setTable(String table) {
+        this.table = table;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    public String getDatabase() {
+        return database;
+    }
+
+    public void setDatabase(String database) {
+        this.database = database;
+    }
+
+    public long getTs() {
+        return ts;
+    }
+
+    public void setTs(long ts) {
+        this.ts = ts;
+    }
+
+    public String getSql() {
+        return sql;
+    }
+
+    public void setSql(String sql) {
+        this.sql = sql;
+    }
+
+    public Map<String, String> getMysqlType() {
+        return mysqlType;
+    }
+
+    public void setMysqlType(Map<String, String> mysqlType) {
+        this.mysqlType = mysqlType;
+    }
+
+    public void setDdl(boolean ddl) {
+        isDdl = ddl;
+    }
+
+    public List<String> getPkNames() {
+        return pkNames;
+    }
+
+    public void setPkNames(List<String> pkNames) {
+        this.pkNames = pkNames;
+    }
+
+}
+
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/AppendMetadataCollector.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/AppendMetadataCollector.java
index 6e83a8358..005254f6c 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/AppendMetadataCollector.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/AppendMetadataCollector.java
@@ -39,17 +39,23 @@ public final class AppendMetadataCollector implements Collector<RowData>, Serial
 
     public transient SourceRecord inputRecord;
     public transient Collector<RowData> outputCollector;
+    private boolean migrateAll;
 
-    public AppendMetadataCollector(MetadataConverter[] metadataConverters) {
+    public AppendMetadataCollector(MetadataConverter[] metadataConverters, boolean migrateAll) {
         this.metadataConverters = metadataConverters;
+        this.migrateAll = migrateAll;
     }
 
     public void collect(RowData physicalRow, TableChange tableSchema) {
         GenericRowData metaRow = new GenericRowData(metadataConverters.length);
         for (int i = 0; i < metadataConverters.length; i++) {
-            Object meta = metadataConverters[i].read(inputRecord, tableSchema);
+            Object meta = metadataConverters[i].read(inputRecord, tableSchema, physicalRow);
             metaRow.setField(i, meta);
         }
+        if (migrateAll) {
+            // all data are put into meta row, set physicalRow to empty
+            physicalRow = new GenericRowData(0);
+        }
         RowData outRow = new JoinedRowData(physicalRow.getRowKind(), physicalRow, metaRow);
         outputCollector.collect(outRow);
     }
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
index 217b4264c..4065c814e 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java
@@ -24,17 +24,25 @@ import io.debezium.data.Envelope;
 import io.debezium.data.SpecialValueDecimal;
 import io.debezium.data.VariableScaleDecimal;
 import io.debezium.relational.history.TableChanges.TableChange;
+import io.debezium.time.Date;
 import io.debezium.time.MicroTime;
 import io.debezium.time.MicroTimestamp;
 import io.debezium.time.NanoTime;
 import io.debezium.time.NanoTimestamp;
 import io.debezium.time.Timestamp;
+import io.debezium.time.ZonedTimestamp;
 import java.io.Serializable;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.time.Instant;
+import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.data.DecimalData;
@@ -49,11 +57,15 @@ import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Collector;
 import org.apache.inlong.sort.singletenant.flink.cdc.debezium.DebeziumDeserializationSchema;
 import org.apache.inlong.sort.singletenant.flink.cdc.debezium.utils.TemporalConversions;
+import org.apache.kafka.connect.data.ConnectSchema;
 import org.apache.kafka.connect.data.Decimal;
 import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.source.SourceRecord;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Deserialization schema from Debezium object to Flink Table/SQL internal data structure {@link
@@ -62,7 +74,17 @@ import org.apache.kafka.connect.source.SourceRecord;
 public final class RowDataDebeziumDeserializeSchema
         implements DebeziumDeserializationSchema<RowData> {
 
+    private static final Logger LOG = LoggerFactory.getLogger(RowDataDebeziumDeserializeSchema.class);
+
     private static final long serialVersionUID = 2L;
+
+    private static final DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;
+
+    private static final DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME;
+
+    private static final DateTimeFormatter timestampFormatter = DateTimeFormatter.ofPattern(
+        "yyyy-MM-dd HH:mm:ss");
+
     /**
      * TypeInformation of the produced {@link RowData}. *
      */
@@ -89,6 +111,10 @@ public final class RowDataDebeziumDeserializeSchema
      */
     private final ValueValidator validator;
 
+    private boolean migrateAll;
+
+    private ZoneId serverTimeZone;
+
     RowDataDebeziumDeserializeSchema(
             RowType physicalDataType,
             MetadataConverter[] metadataConverters,
@@ -96,9 +122,12 @@ public final class RowDataDebeziumDeserializeSchema
             ValueValidator validator,
             ZoneId serverTimeZone,
             boolean appendSource,
-            DeserializationRuntimeConverterFactory userDefinedConverterFactory) {
+            DeserializationRuntimeConverterFactory userDefinedConverterFactory,
+        boolean migrateAll) {
         this.hasMetadata = checkNotNull(metadataConverters).length > 0;
-        this.appendMetadataCollector = new AppendMetadataCollector(metadataConverters);
+        this.appendMetadataCollector = new AppendMetadataCollector(metadataConverters, migrateAll);
+        this.migrateAll = migrateAll;
+        this.serverTimeZone = serverTimeZone;
         this.physicalConverter =
                 createConverter(
                         checkNotNull(physicalDataType),
@@ -119,7 +148,7 @@ public final class RowDataDebeziumDeserializeSchema
     /**
      * Creates a runtime converter which is null safe.
      */
-    private static DeserializationRuntimeConverter createConverter(
+    private DeserializationRuntimeConverter createConverter(
             LogicalType type,
             ZoneId serverTimeZone,
             DeserializationRuntimeConverterFactory userDefinedConverterFactory) {
@@ -130,7 +159,7 @@ public final class RowDataDebeziumDeserializeSchema
     /**
      * Creates a runtime converter which assuming input object is not null.
      */
-    public static DeserializationRuntimeConverter createNotNullConverter(
+    public DeserializationRuntimeConverter createNotNullConverter(
             LogicalType type,
             ZoneId serverTimeZone,
             DeserializationRuntimeConverterFactory userDefinedConverterFactory) {
@@ -474,7 +503,7 @@ public final class RowDataDebeziumDeserializeSchema
         };
     }
 
-    private static DeserializationRuntimeConverter createRowConverter(
+    private DeserializationRuntimeConverter createRowConverter(
             RowType rowType,
             ZoneId serverTimeZone,
             DeserializationRuntimeConverterFactory userDefinedConverterFactory) {
@@ -490,33 +519,104 @@ public final class RowDataDebeziumDeserializeSchema
                         .toArray(DeserializationRuntimeConverter[]::new);
         final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
 
+        if (!migrateAll) {
+            return new DeserializationRuntimeConverter() {
+
+                private static final long serialVersionUID = 1L;
+
+                @Override
+                public Object convert(Object dbzObj, Schema schema) throws Exception {
+                    Struct struct = (Struct) dbzObj;
+                    int arity = fieldNames.length;
+                    GenericRowData row = new GenericRowData(arity);
+                    for (int i = 0; i < arity; i++) {
+                        String fieldName = fieldNames[i];
+                        Field field = schema.field(fieldName);
+                        if (field == null) {
+                            row.setField(i, null);
+                        } else {
+                            Object fieldValue = struct.getWithoutDefault(fieldName);
+                            Schema fieldSchema = schema.field(fieldName).schema();
+                            Object convertedField =
+                                convertField(fieldConverters[i], fieldValue, fieldSchema);
+                            row.setField(i, convertedField);
+                        }
+                    }
+                    return row;
+                }
+            };
+        } else {
+            return getAllMigrationConverter();
+        }
+    }
+
+    @NotNull
+    private DeserializationRuntimeConverter getAllMigrationConverter() {
         return new DeserializationRuntimeConverter() {
 
             private static final long serialVersionUID = 1L;
 
             @Override
-            public Object convert(Object dbzObj, Schema schema) throws Exception {
+            public Object convert(Object dbzObj, Schema schema)  {
+
+                ConnectSchema connectSchema = (ConnectSchema) schema;
+                List<Field> fields = connectSchema.fields();
+
+                Map<String, Object> data = new HashMap<>();
                 Struct struct = (Struct) dbzObj;
-                int arity = fieldNames.length;
-                GenericRowData row = new GenericRowData(arity);
-                for (int i = 0; i < arity; i++) {
-                    String fieldName = fieldNames[i];
-                    Field field = schema.field(fieldName);
-                    if (field == null) {
-                        row.setField(i, null);
-                    } else {
-                        Object fieldValue = struct.getWithoutDefault(fieldName);
-                        Schema fieldSchema = schema.field(fieldName).schema();
-                        Object convertedField =
-                                convertField(fieldConverters[i], fieldValue, fieldSchema);
-                        row.setField(i, convertedField);
+
+                for (Field field : fields) {
+                    String fieldName = field.name();
+                    Object fieldValue = struct.getWithoutDefault(fieldName);
+                    Schema fieldSchema = schema.field(fieldName).schema();
+                    String schemaName = fieldSchema.name();
+                    if (schemaName != null) {
+                        // normal type doesn't have schema name
+                        // schema names are time schemas
+                        fieldValue = getTimeValue(fieldValue, schemaName);
                     }
+                    data.put(fieldName, fieldValue);
                 }
+
+                GenericRowData row = new GenericRowData(1);
+                row.setField(0, data);
+
                 return row;
             }
         };
     }
 
+    /**
+     * transform debezium time format to database format
+     * @param fieldValue
+     * @param schemaName
+     * @return
+     */
+    private Object getTimeValue(Object fieldValue, String schemaName) {
+        switch (schemaName) {
+            case MicroTime.SCHEMA_NAME:
+                Instant instant = Instant.ofEpochMilli((Long) fieldValue / 1000);
+                fieldValue = timeFormatter.format(LocalDateTime.ofInstant(instant,serverTimeZone));
+                break;
+            case Date.SCHEMA_NAME:
+                fieldValue = dateFormatter.format(LocalDate.ofEpochDay((Integer) fieldValue));
+                break;
+            case ZonedTimestamp.SCHEMA_NAME:
+                ZonedDateTime zonedDateTime = ZonedDateTime.parse((CharSequence) fieldValue);
+                fieldValue = timestampFormatter.format(zonedDateTime
+                    .withZoneSameInstant(serverTimeZone).toLocalDateTime());
+                break;
+            case Timestamp.SCHEMA_NAME:
+                Instant instantTime = Instant.ofEpochMilli((Long) fieldValue);
+                fieldValue = timestampFormatter.format(LocalDateTime.ofInstant(instantTime,
+                    serverTimeZone));
+                break;
+            default:
+                LOG.error("parse schema {} error", schemaName);
+        }
+        return fieldValue;
+    }
+
     private static Object convertField(
             DeserializationRuntimeConverter fieldConverter, Object fieldValue, Schema fieldSchema)
             throws Exception {
@@ -548,6 +648,7 @@ public final class RowDataDebeziumDeserializeSchema
         deserialize(record, out, null);
     }
 
+    @Override
     public void deserialize(SourceRecord record, Collector<RowData> out,
             TableChange tableSchema)
             throws Exception {
@@ -632,6 +733,7 @@ public final class RowDataDebeziumDeserializeSchema
         };
         private ZoneId serverTimeZone = ZoneId.of("UTC");
         private boolean appendSource = false;
+        private boolean migrateAll = false;
         private DeserializationRuntimeConverterFactory userDefinedConverterFactory =
                 DeserializationRuntimeConverterFactory.DEFAULT;
 
@@ -640,6 +742,11 @@ public final class RowDataDebeziumDeserializeSchema
             return this;
         }
 
+        public Builder setMigrateAll(boolean migrateAll) {
+            this.migrateAll = migrateAll;
+            return this;
+        }
+
         public Builder setMetadataConverters(MetadataConverter[] metadataConverters) {
             this.metadataConverters = metadataConverters;
             return this;
@@ -679,7 +786,8 @@ public final class RowDataDebeziumDeserializeSchema
                     validator,
                     serverTimeZone,
                     appendSource,
-                    userDefinedConverterFactory);
+                    userDefinedConverterFactory,
+                    migrateAll);
         }
     }
 }
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/source/config/MySqlSourceOptions.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/source/config/MySqlSourceOptions.java
index 8cc49ff93..c1011aa5f 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/source/config/MySqlSourceOptions.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/source/config/MySqlSourceOptions.java
@@ -189,6 +189,12 @@ public class MySqlSourceOptions {
                     .defaultValue(false)
                     .withDescription("Whether works as append source.");
 
+    public static final ConfigOption<Boolean> MIGRATE_ALL =
+        ConfigOptions.key("migrate-all")
+            .booleanType()
+            .defaultValue(false)
+            .withDescription("Whether migrate all databases");
+
     // ----------------------------------------------------------------------------
     // experimental options, won't add them to documentation
     // ----------------------------------------------------------------------------
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlReadableMetadata.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlReadableMetadata.java
index 37cf1bd07..5e4d4bf3d 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlReadableMetadata.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlReadableMetadata.java
@@ -20,18 +20,24 @@ package org.apache.inlong.sort.singletenant.flink.cdc.mysql.table;
 
 import io.debezium.connector.AbstractSourceInfo;
 import io.debezium.data.Envelope;
+import io.debezium.data.Envelope.FieldName;
 import io.debezium.relational.Table;
 import io.debezium.relational.history.TableChanges;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.GenericArrayData;
 import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.types.DataType;
+import org.apache.inlong.sort.formats.json.canal.CanalJson;
 import org.apache.inlong.sort.singletenant.flink.cdc.debezium.table.MetadataConverter;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.source.SourceRecord;
@@ -51,10 +57,7 @@ public enum MySqlReadableMetadata {
 
                 @Override
                 public Object read(SourceRecord record) {
-                    Struct messageStruct = (Struct) record.value();
-                    Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE);
-                    return StringData.fromString(
-                            sourceStruct.getString(AbstractSourceInfo.TABLE_NAME_KEY));
+                    return StringData.fromString(getMetaData(record, AbstractSourceInfo.TABLE_NAME_KEY));
                 }
             }),
 
@@ -69,10 +72,7 @@ public enum MySqlReadableMetadata {
 
                 @Override
                 public Object read(SourceRecord record) {
-                    Struct messageStruct = (Struct) record.value();
-                    Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE);
-                    return StringData.fromString(
-                            sourceStruct.getString(AbstractSourceInfo.DATABASE_NAME_KEY));
+                    return StringData.fromString(getMetaData(record, AbstractSourceInfo.DATABASE_NAME_KEY));
                 }
             }),
 
@@ -96,26 +96,55 @@ public enum MySqlReadableMetadata {
             }),
 
     DATA(
-            "meta.data",
-            DataTypes.BIGINT(),
-            new MetadataConverter() {
-                private static final long serialVersionUID = 1L;
-
-                @Override
-                public Object read(SourceRecord record) {
-                    record.value().toString();
-                    Struct messageStruct = (Struct) record.value();
-                    Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE);
-                    return TimestampData.fromEpochMillis(
-                            (Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY));
-                }
-
-                @Override
-                public Object read(SourceRecord record,
-                        @org.jetbrains.annotations.Nullable TableChanges.TableChange tableSchema, RowData rowData) {
-                    return rowData.getLong(0);
+        "meta.data",
+        DataTypes.STRING(),
+        new MetadataConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object read(SourceRecord record) {
+                record.value().toString();
+                Struct messageStruct = (Struct) record.value();
+                Struct sourceStruct = messageStruct.getStruct(FieldName.TIMESTAMP);
+                sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY);
+                return TimestampData.fromEpochMillis(
+                    (Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY));
+            }
+
+            @Override
+            public Object read(SourceRecord record,
+                @org.jetbrains.annotations.Nullable TableChanges.TableChange tableSchema, RowData rowData) {
+                // construct canal json
+                Struct messageStruct = (Struct) record.value();
+                Struct sourceStruct = messageStruct.getStruct(FieldName.SOURCE);
+                // tableName
+                String tableName = getMetaData(record, AbstractSourceInfo.TABLE_NAME_KEY);
+                // databaseName
+                String databaseName = getMetaData(record, AbstractSourceInfo.DATABASE_NAME_KEY);
+                // opTs
+                long opTs = (Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY);
+                // ts
+                long ts = (Long) messageStruct.get(Envelope.FieldName.TIMESTAMP);
+                // actual data
+                GenericRowData data = (GenericRowData) rowData;
+                Map<String, Object> field = (Map<String, Object>) data.getField(0);
+                List<Map<String, Object>> dataList = new ArrayList<>();
+                dataList.add(field);
+
+                CanalJson canalJson = CanalJson.builder()
+                    .data(dataList).database(databaseName)
+                    .sql("").es(opTs).isDdl(false).pkNames(getPkNames(tableSchema))
+                    .mysqlType(getMysqlType(tableSchema)).table(tableName).ts(ts)
+                    .type(getOpType(record)).build();
+
+                try {
+                    ObjectMapper objectMapper = new ObjectMapper();
+                    return StringData.fromString(objectMapper.writeValueAsString(canalJson));
+                } catch (Exception e) {
+                    throw new IllegalStateException("exception occurs when get meta data", e);
                 }
-            }),
+            }
+        }),
 
     /**
      * Name of the table that contain the row. .
@@ -128,10 +157,7 @@ public enum MySqlReadableMetadata {
 
                 @Override
                 public Object read(SourceRecord record) {
-                    Struct messageStruct = (Struct) record.value();
-                    Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE);
-                    return StringData.fromString(
-                            sourceStruct.getString(AbstractSourceInfo.TABLE_NAME_KEY));
+                    return getMetaData(record, AbstractSourceInfo.TABLE_NAME_KEY);
                 }
             }),
 
@@ -146,10 +172,7 @@ public enum MySqlReadableMetadata {
 
                 @Override
                 public Object read(SourceRecord record) {
-                    Struct messageStruct = (Struct) record.value();
-                    Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE);
-                    return StringData.fromString(
-                            sourceStruct.getString(AbstractSourceInfo.DATABASE_NAME_KEY));
+                    return getMetaData(record, AbstractSourceInfo.DATABASE_NAME_KEY);
                 }
             }),
 
@@ -183,14 +206,7 @@ public enum MySqlReadableMetadata {
 
                 @Override
                 public Object read(SourceRecord record) {
-                    final Envelope.Operation op = Envelope.operationFor(record);
-                    if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
-                        return StringData.fromString("INSERT");
-                    } else if (op == Envelope.Operation.DELETE) {
-                        return StringData.fromString("DELETE");
-                    } else {
-                        return StringData.fromString("UPDATE");
-                    }
+                    return StringData.fromString(getOpType(record));
                 }
             }),
 
@@ -362,10 +378,54 @@ public enum MySqlReadableMetadata {
                 public Object read(SourceRecord record) {
                     Struct messageStruct = (Struct) record.value();
                     return TimestampData.fromEpochMillis(
-                            (Long) messageStruct.get(Envelope.FieldName.TIMESTAMP));
+                        (Long) messageStruct.get(Envelope.FieldName.TIMESTAMP));
                 }
             });
 
+    private static String getOpType(SourceRecord record) {
+        String opType;
+        final Envelope.Operation op = Envelope.operationFor(record);
+        if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
+            opType = "INSERT";
+        } else if (op == Envelope.Operation.DELETE) {
+            opType = "DELETE";
+        } else {
+            opType = "UPDATE";
+        }
+        return opType;
+    }
+
+    private static List<String> getPkNames(@Nullable TableChanges.TableChange tableSchema) {
+        if (tableSchema == null) {
+            return null;
+        }
+        return tableSchema.getTable().primaryKeyColumnNames();
+    }
+
+    public static Map<String, String> getMysqlType(@Nullable TableChanges.TableChange tableSchema) {
+        if (tableSchema == null) {
+            return null;
+        }
+        Map<String, String> mysqlType = new HashMap<>();
+        final Table table = tableSchema.getTable();
+        table.columns()
+            .forEach(
+                column -> {
+                    mysqlType.put(
+                        column.name(),
+                        String.format(
+                                "%s(%d)",
+                                column.typeName(),
+                                column.length()));
+                });
+        return mysqlType;
+    }
+
+    private static String getMetaData(SourceRecord record, String tableNameKey) {
+        Struct messageStruct = (Struct) record.value();
+        Struct sourceStruct = messageStruct.getStruct(FieldName.SOURCE);
+        return sourceStruct.getString(tableNameKey);
+    }
 
     private final String key;
 
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlTableInlongSourceFactory.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlTableInlongSourceFactory.java
index a7a7bb2eb..9d4382e29 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlTableInlongSourceFactory.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlTableInlongSourceFactory.java
@@ -28,6 +28,7 @@ import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.source.config.
 import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.source.config.MySqlSourceOptions.DATABASE_NAME;
 import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.source.config.MySqlSourceOptions.HEARTBEAT_INTERVAL;
 import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.source.config.MySqlSourceOptions.HOSTNAME;
+import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.source.config.MySqlSourceOptions.MIGRATE_ALL;
 import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.source.config.MySqlSourceOptions.PASSWORD;
 import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.source.config.MySqlSourceOptions.PORT;
 import static org.apache.inlong.sort.singletenant.flink.cdc.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
@@ -137,6 +138,7 @@ public class MySqlTableInlongSourceFactory implements DynamicTableSourceFactory
         int connectMaxRetries = config.get(CONNECT_MAX_RETRIES);
         int connectionPoolSize = config.get(CONNECTION_POOL_SIZE);
         final boolean appendSource = config.get(APPEND_MODE);
+        final boolean migrateAll = config.get(MIGRATE_ALL);
         double distributionFactorUpper = config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
         double distributionFactorLower = config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
         boolean scanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
@@ -179,7 +181,8 @@ public class MySqlTableInlongSourceFactory implements DynamicTableSourceFactory
                 startupOptions,
                 scanNewlyAddedTableEnabled,
                 JdbcUrlUtils.getJdbcProperties(context.getCatalogTable().getOptions()),
-                heartbeatInterval);
+                heartbeatInterval,
+                migrateAll);
     }
 
     @Override
@@ -218,6 +221,7 @@ public class MySqlTableInlongSourceFactory implements DynamicTableSourceFactory
         options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
         options.add(CONNECT_MAX_RETRIES);
         options.add(APPEND_MODE);
+        options.add(MIGRATE_ALL);
         options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED);
         options.add(HEARTBEAT_INTERVAL);
         return options;
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlTableSource.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlTableSource.java
index 9c1c6baff..792dc304a 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlTableSource.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/cdc/mysql/table/MySqlTableSource.java
@@ -79,7 +79,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
     private final boolean scanNewlyAddedTableEnabled;
     private final Properties jdbcProperties;
     private final Duration heartbeatInterval;
-
+    private final boolean migrateAll;
     // --------------------------------------------------------------------------------------------
     // Mutable attributes
     // --------------------------------------------------------------------------------------------
@@ -116,7 +116,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
             double distributionFactorLower,
             boolean appendSource,
             StartupOptions startupOptions,
-            Duration heartbeatInterval) {
+            Duration heartbeatInterval,
+        boolean migrateAll) {
         this(
                 physicalSchema,
                 port,
@@ -141,7 +142,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
                 startupOptions,
                 false,
                 new Properties(),
-                heartbeatInterval);
+                heartbeatInterval,
+            migrateAll);
     }
 
     public MySqlTableSource(
@@ -168,7 +170,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
             StartupOptions startupOptions,
             boolean scanNewlyAddedTableEnabled,
             Properties jdbcProperties,
-            Duration heartbeatInterval) {
+            Duration heartbeatInterval,
+        boolean migrateAll) {
         this.physicalSchema = physicalSchema;
         this.port = port;
         this.hostname = checkNotNull(hostname);
@@ -196,6 +199,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
         this.producedDataType = physicalSchema.toPhysicalRowDataType();
         this.metadataKeys = Collections.emptyList();
         this.heartbeatInterval = heartbeatInterval;
+        this.migrateAll = migrateAll;
     }
 
     @Override
@@ -227,6 +231,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
                         .setAppendSource(appendSource)
                         .setUserDefinedConverterFactory(
                                 MySqlDeserializationConverterFactory.instance())
+                    .setMigrateAll(migrateAll)
                         .build();
         if (enableParallelRead) {
             MySqlSource<RowData> parallelSource =
@@ -337,7 +342,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
                         startupOptions,
                         scanNewlyAddedTableEnabled,
                         jdbcProperties,
-                        heartbeatInterval);
+                        heartbeatInterval,
+                        migrateAll);
         source.metadataKeys = metadataKeys;
         source.producedDataType = producedDataType;
         return source;
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/AllMigrateTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/AllMigrateTest.java
new file mode 100644
index 000000000..7347b2078
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/AllMigrateTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.parser;
+
+import static org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField.MYSQL_METADATA_DATA;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
+import org.apache.inlong.sort.protocol.node.format.CsvFormat;
+import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.singletenant.flink.parser.impl.FlinkSqlParser;
+import org.apache.inlong.sort.singletenant.flink.parser.result.FlinkSqlParseResult;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AllMigrateTest {
+
+    private MySqlExtractNode buildAllMigrateExtractNode() {
+        List<FieldInfo> fields = Arrays.asList(
+            new BuiltInFieldInfo("data", new StringFormatInfo(), MYSQL_METADATA_DATA));
+        Map<String, String> option = new HashMap<>();
+        option.put("append-mode", "true");
+        option.put("migrate-all", "true");
+        MySqlExtractNode node = new MySqlExtractNode("1", "mysql_input", fields,
+            null, option, null,
+            Arrays.asList("[\\s\\S]*.*"), "localhost", "root", "password",
+            "[\\s\\S]*.*", null, null, false, null);
+        return node;
+    }
+
+    private KafkaLoadNode buildAllMigrateKafkaNode() {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("data", new StringFormatInfo()));
+        List<FieldRelationShip> relations = Arrays
+            .asList(new FieldRelationShip(new FieldInfo("data", new StringFormatInfo()),
+                new FieldInfo("data", new StringFormatInfo())));
+        CsvFormat csvFormat = new CsvFormat();
+        csvFormat.setDisableQuoteCharacter(true);
+        return new KafkaLoadNode("2", "kafka_output", fields, relations, null,
+            "topic", "localhost:9092",
+            csvFormat, null,
+            null, null);
+    }
+
+    private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+        List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
+        List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
+        return new NodeRelationShip(inputIds, outputIds);
+    }
+
+    /**
+     * Test flink sql parse
+     *
+     * @throws Exception The exception may throws when execute the case
+     */
+    @Test
+    public void testAllMigrate() throws Exception {
+        EnvironmentSettings settings = EnvironmentSettings
+            .newInstance()
+            .useBlinkPlanner()
+            .inStreamingMode()
+            .build();
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(10000);
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+        Node inputNode = buildAllMigrateExtractNode();
+        Node outputNode = buildAllMigrateKafkaNode();
+        StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode, outputNode),
+            Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
+                Collections.singletonList(outputNode))));
+        GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo));
+        FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
+        FlinkSqlParseResult result = parser.parse();
+        Assert.assertTrue(result.tryExecute());
+
+    }
+
+}
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FlinkSqlParserTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FlinkSqlParserTest.java
index f97367eb4..14ca70205 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FlinkSqlParserTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FlinkSqlParserTest.java
@@ -52,37 +52,37 @@ public class FlinkSqlParserTest extends AbstractTestBase {
 
     private MySqlExtractNode buildMySQLExtractNode() {
         List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
-                new FieldInfo("name", new StringFormatInfo()),
-                new FieldInfo("age", new IntFormatInfo()),
-                new FieldInfo("salary", new FloatFormatInfo()),
-                new FieldInfo("ts", new TimestampFormatInfo()));
+            new FieldInfo("name", new StringFormatInfo()),
+            new FieldInfo("age", new IntFormatInfo()),
+            new FieldInfo("salary", new FloatFormatInfo()),
+            new FieldInfo("ts", new TimestampFormatInfo()));
         return new MySqlExtractNode("1", "mysql_input", fields,
-                null, null, "id",
-                Collections.singletonList("test"), "localhost", "username", "username",
-                "test_database", null, null,
-                null, null);
+            null, null, "id",
+            Collections.singletonList("test"), "localhost", "username", "username",
+            "test_database", null, null,
+            null, null);
     }
 
     private KafkaLoadNode buildKafkaNode() {
         List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
-                new FieldInfo("name", new StringFormatInfo()),
-                new FieldInfo("age", new IntFormatInfo()),
-                new FieldInfo("salary", new FloatFormatInfo()),
-                new FieldInfo("ts", new TimestampFormatInfo()));
+            new FieldInfo("name", new StringFormatInfo()),
+            new FieldInfo("age", new IntFormatInfo()),
+            new FieldInfo("salary", new FloatFormatInfo()),
+            new FieldInfo("ts", new TimestampFormatInfo()));
         List<FieldRelationShip> relations = Arrays
-                .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
-                                new FieldInfo("id", new LongFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
-                                new FieldInfo("name", new StringFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
-                                new FieldInfo("age", new IntFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
-                                new FieldInfo("ts", new TimestampFormatInfo()))
-                );
+            .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+                    new FieldInfo("id", new LongFormatInfo())),
+                new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+                    new FieldInfo("name", new StringFormatInfo())),
+                new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+                    new FieldInfo("age", new IntFormatInfo())),
+                new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
+                    new FieldInfo("ts", new TimestampFormatInfo()))
+            );
         return new KafkaLoadNode("2", "kafka_output", fields, relations, null,
-                "topic", "localhost:9092",
-                new CanalJsonFormat(), null,
-                null, null);
+            "topic", "localhost:9092",
+            new CanalJsonFormat(), null,
+            null, null);
     }
 
     private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
@@ -120,7 +120,7 @@ public class FlinkSqlParserTest extends AbstractTestBase {
      * @throws Exception The exception may throws when execute the case
      */
     @Test
-    public void testMysqlToHive() throws Exception {
+    public void testMysqlToHive() {
         EnvironmentSettings settings = EnvironmentSettings
                 .newInstance()
                 .useBlinkPlanner()