You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/15 06:58:21 UTC

[incubator-inlong] branch master updated: [INLONG-3130][Sort] Support extract specified metadata from input data with canal format (#3139)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 95887dd  [INLONG-3130][Sort] Support extract specified metadata from input data with canal format (#3139)
95887dd is described below

commit 95887dd6e424b777ce44ec8181b0fb935d193e27
Author: TianqiWan <52...@users.noreply.github.com>
AuthorDate: Tue Mar 15 14:58:15 2022 +0800

    [INLONG-3130][Sort] Support extract specified metadata from input data with canal format (#3139)
---
 .../deserialization/CanalDeserializationInfo.java  |   1 +
 .../json/canal/CanalJsonDecodingFormat.java        |   8 +-
 .../json/canal/CanalJsonDeserializationSchema.java |  40 +++---
 .../inlong/sort/formats/json/canal/CanalUtils.java |  40 ++++++
 .../inlong/sort/singletenant/flink/Entrance.java   |   6 +-
 .../CanalDeserializationSchemaBuilder.java         | 144 +++++++++++----------
 .../DebeziumDeserializationSchemaBuilder.java      |  11 +-
 .../DeserializationSchemaFactory.java              |   2 +-
 .../deserialization/FieldMappingTransformer.java   |   6 +-
 .../sort/singletenant/flink/utils/CommonUtils.java |  12 ++
 .../deserialization/CanalDeserializationTest.java  | 103 ++++++++-------
 .../DeserializationFunctionTest.java               |  92 +++++++++++++
 12 files changed, 317 insertions(+), 148 deletions(-)

diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CanalDeserializationInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CanalDeserializationInfo.java
index a6ac92c..175cd8a 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CanalDeserializationInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CanalDeserializationInfo.java
@@ -42,6 +42,7 @@ public class CanalDeserializationInfo implements DeserializationInfo {
     @JsonProperty("timestamp_format_standard")
     private final String timestampFormatStandard;
 
+    @Deprecated
     @JsonProperty("include_metadata")
     private final boolean includeMetadata;
 
diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonDecodingFormat.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonDecodingFormat.java
index 343f4e9..c6ac39c 100644
--- a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonDecodingFormat.java
+++ b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonDecodingFormat.java
@@ -231,7 +231,7 @@ public class CanalJsonDecodingFormat implements DecodingFormat<DeserializationSc
 
         EVENT_TIMESTAMP(
                 "event-timestamp",
-                DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
+                DataTypes.BIGINT().nullable(),
                 DataTypes.FIELD("es", DataTypes.BIGINT()),
                 new MetadataConverter() {
                     private static final long serialVersionUID = 1L;
@@ -241,7 +241,7 @@ public class CanalJsonDecodingFormat implements DecodingFormat<DeserializationSc
                         if (row.isNullAt(pos)) {
                             return null;
                         }
-                        return TimestampData.fromEpochMillis(row.getLong(pos));
+                        return row.getLong(pos);
                     }
 
                     @Override
@@ -289,5 +289,9 @@ public class CanalJsonDecodingFormat implements DecodingFormat<DeserializationSc
             this.requiredJsonField = requiredJsonField;
             this.converter = converter;
         }
+
+        public String getKey() {
+            return key;
+        }
     }
 }
diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonDeserializationSchema.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonDeserializationSchema.java
index 089debc..c0012ab 100644
--- a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonDeserializationSchema.java
+++ b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonDeserializationSchema.java
@@ -19,10 +19,13 @@
 package org.apache.inlong.sort.formats.json.canal;
 
 import static java.lang.String.format;
+import static org.apache.inlong.sort.formats.json.canal.CanalUtils.getMysqlMetadataKey;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -35,8 +38,10 @@ import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericMapData;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.utils.DataTypeUtils;
@@ -67,12 +72,11 @@ public final class CanalJsonDeserializationSchema implements DeserializationSche
     /** The deserializer to deserialize Canal JSON data. */
     private final JsonRowDataDeserializationSchema jsonDeserializer;
 
-    /** Flag that indicates that an additional projection is required for metadata. */
-    private final boolean hasMetadata;
-
     /** Metadata to be extracted for every record. */
     private final MetadataConverter[] metadataConverters;
 
+    private final List<ReadableMetadata> requestedMetadata;
+
     /** {@link TypeInformation} of the produced {@link RowData} (physical + meta data). */
     private final TypeInformation<RowData> producedTypeInfo;
 
@@ -116,8 +120,8 @@ public final class CanalJsonDeserializationSchema implements DeserializationSche
                         // failOnMissingField
                         ignoreParseErrors,
                         timestampFormat);
-        this.hasMetadata = requestedMetadata.size() > 0;
         this.metadataConverters = createMetadataConverters(jsonRowType, requestedMetadata);
+        this.requestedMetadata = requestedMetadata;
         this.producedTypeInfo = producedTypeInfo;
         this.database = database;
         this.table = table;
@@ -284,24 +288,26 @@ public final class CanalJsonDeserializationSchema implements DeserializationSche
         }
     }
 
-    private void emitRow(
-            GenericRowData rootRow, GenericRowData physicalRow, Collector<RowData> out) {
-        // shortcut in case no output projection is required
-        if (!hasMetadata) {
-            out.collect(physicalRow);
-            return;
-        }
+    private void emitRow(GenericRowData rootRow, GenericRowData physicalRow, Collector<RowData> out) {
         final int physicalArity = physicalRow.getArity();
         final int metadataArity = metadataConverters.length;
-        final GenericRowData producedRow =
-                new GenericRowData(physicalRow.getRowKind(), physicalArity + metadataArity);
+        final GenericRowData producedRow = new GenericRowData(physicalRow.getRowKind(), physicalArity + 1);
+
         for (int physicalPos = 0; physicalPos < physicalArity; physicalPos++) {
-            producedRow.setField(physicalPos, physicalRow.getField(physicalPos));
+            producedRow.setField(physicalPos + 1, physicalRow.getField(physicalPos));
         }
+
+        // Put metadata in the first field of the emitted RowData
+        Map<StringData, StringData> metadataMap = new HashMap<>();
+
         for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) {
-            producedRow.setField(
-                    physicalArity + metadataPos, metadataConverters[metadataPos].convert(rootRow));
+            metadataMap.put(
+                    StringData.fromString(getMysqlMetadataKey(requestedMetadata.get(metadataPos))),
+                    StringData.fromString(metadataConverters[metadataPos].convert(rootRow).toString())
+            );
         }
+        producedRow.setField(0, new GenericMapData(metadataMap));
+
         out.collect(producedRow);
     }
 
@@ -325,7 +331,6 @@ public final class CanalJsonDeserializationSchema implements DeserializationSche
         }
         CanalJsonDeserializationSchema that = (CanalJsonDeserializationSchema) o;
         return Objects.equals(jsonDeserializer, that.jsonDeserializer)
-                && hasMetadata == that.hasMetadata
                 && Objects.equals(producedTypeInfo, that.producedTypeInfo)
                 && Objects.equals(database, that.database)
                 && Objects.equals(table, that.table)
@@ -337,7 +342,6 @@ public final class CanalJsonDeserializationSchema implements DeserializationSche
     public int hashCode() {
         return Objects.hash(
                 jsonDeserializer,
-                hasMetadata,
                 producedTypeInfo,
                 database,
                 table,
diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalUtils.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalUtils.java
new file mode 100644
index 0000000..3c9ce61
--- /dev/null
+++ b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalUtils.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.json.canal;
+
+import org.apache.inlong.sort.formats.json.MysqlBinLogData;
+import org.apache.inlong.sort.formats.json.canal.CanalJsonDecodingFormat.ReadableMetadata;
+
+public class CanalUtils {
+
+    public static String getMysqlMetadataKey(ReadableMetadata readableMetadata) {
+        switch (readableMetadata) {
+            case DATABASE:
+                return MysqlBinLogData.MYSQL_METADATA_DATABASE;
+            case TABLE:
+                return MysqlBinLogData.MYSQL_METADATA_TABLE;
+            case IS_DDL:
+                return MysqlBinLogData.MYSQL_METADATA_IS_DDL;
+            case EVENT_TIMESTAMP:
+                return MysqlBinLogData.MYSQL_METADATA_EVENT_TIME;
+            default:
+                throw new IllegalArgumentException("Not supported yet");
+        }
+    }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
index ad42a82..5c78a26 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
@@ -41,6 +41,7 @@ import org.apache.inlong.sort.flink.hive.HiveCommitter;
 import org.apache.inlong.sort.flink.hive.HiveWriter;
 import org.apache.inlong.sort.protocol.DataFlowInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.deserialization.CanalDeserializationInfo;
 import org.apache.inlong.sort.protocol.deserialization.DebeziumDeserializationInfo;
 import org.apache.inlong.sort.protocol.sink.ClickHouseSinkInfo;
 import org.apache.inlong.sort.protocol.sink.HiveSinkInfo;
@@ -155,10 +156,13 @@ public class Entrance {
                 sourceFields, sourceInfo.getDeserializationInfo());
         FieldMappingTransformer fieldMappingTransformer = new FieldMappingTransformer(config, sourceFields);
 
+        // Currently, canal and debezium deserialization schema will put a map at the first position
+        // of the deserialized row. So the `appendAttributes` flag should be set false.
         DeserializationFunction function = new DeserializationFunction(
                 schema,
                 fieldMappingTransformer,
-                !(sourceInfo.getDeserializationInfo() instanceof DebeziumDeserializationInfo));
+                !(sourceInfo.getDeserializationInfo() instanceof DebeziumDeserializationInfo)
+                        && !(sourceInfo.getDeserializationInfo() instanceof CanalDeserializationInfo));
 
         DataStream<Row> deserializedStream = sourceStream.process(function)
                 .uid(Constants.DESERIALIZATION_SCHEMA_UID)
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/CanalDeserializationSchemaBuilder.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/CanalDeserializationSchemaBuilder.java
index 7ca93a5..57c0b7a 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/CanalDeserializationSchemaBuilder.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/CanalDeserializationSchemaBuilder.java
@@ -17,53 +17,59 @@
 
 package org.apache.inlong.sort.singletenant.flink.deserialization;
 
+import static org.apache.flink.table.types.utils.DataTypeUtils.validateInputDataType;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertDateToStringFormatInfo;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertFieldInfosToDataType;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.extractFormatInfos;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.getProducedFieldInfos;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.getTimestampFormatStandard;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.common.TimestampFormat;
-import org.apache.flink.formats.json.canal.CanalJsonDecodingFormat;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.Row;
-import org.apache.inlong.sort.formats.common.ArrayFormatInfo;
-import org.apache.inlong.sort.formats.common.IntFormatInfo;
-import org.apache.inlong.sort.formats.common.LocalZonedTimestampFormatInfo;
-import org.apache.inlong.sort.formats.common.MapFormatInfo;
+import org.apache.inlong.sort.formats.common.BooleanFormatInfo;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.json.canal.CanalJsonDecodingFormat;
+import org.apache.inlong.sort.formats.json.canal.CanalJsonDecodingFormat.ReadableMetadata;
+import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
+import org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.deserialization.CanalDeserializationInfo;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.apache.flink.table.types.utils.DataTypeUtils.validateInputDataType;
-import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertDateToStringFormatInfo;
-import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertFieldInfosToDataType;
-import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.extractFormatInfos;
-import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.getTimestampFormatStandard;
+import org.apache.inlong.sort.singletenant.flink.utils.CommonUtils;
 
 public class CanalDeserializationSchemaBuilder {
 
-    private static final List<String> ALL_SUPPORTED_METADATA_KEYS =
-            Arrays.asList("database", "table", "sql-type", "pk-names", "ingestion-timestamp", "event-timestamp");
-
     public static DeserializationSchema<Row> build(
             FieldInfo[] fieldInfos,
             CanalDeserializationInfo deserializationInfo
     ) throws IOException, ClassNotFoundException {
         String timestampFormatStandard = deserializationInfo.getTimestampFormatStandard();
-        boolean includeMetadata = deserializationInfo.isIncludeMetadata();
-        CanalJsonDecodingFormat canalJsonDecodingFormat = createCanalJsonDecodingFormat(
+        CanalJsonDecodingFormat canalJsonDecodingFormat = new CanalJsonDecodingFormat(
                 deserializationInfo.getDatabase(),
                 deserializationInfo.getTable(),
                 deserializationInfo.isIgnoreParseErrors(),
-                timestampFormatStandard,
-                includeMetadata
+                getTimestampFormatStandard(timestampFormatStandard)
         );
 
-        FieldInfo[] convertedInputFields = convertDateToStringFormatInfo(fieldInfos);
+        // Extract required metadata
+        FieldInfo[] metadataFieldInfos = getMetadataFieldInfos(fieldInfos);
+        List<String> requiredMetadataKeys = Arrays.stream(metadataFieldInfos)
+                .map(FieldInfo::getName)
+                .collect(Collectors.toList());
+        canalJsonDecodingFormat.applyReadableMetadata(requiredMetadataKeys);
+
+        FieldInfo[] originPhysicalFieldInfos = CommonUtils.extractNonBuiltInFieldInfos(fieldInfos, false);
+        FieldInfo[] convertedPhysicalFieldInfos = convertDateToStringFormatInfo(originPhysicalFieldInfos);
         DeserializationSchema<RowData> canalSchema = canalJsonDecodingFormat.createRuntimeDecoder(
                 new DynamicTableSource.Context() {
                     @Override
@@ -77,61 +83,59 @@ public class CanalDeserializationSchemaBuilder {
                         return null;
                     }
                 },
-                convertFieldInfosToDataType(convertedInputFields)
+                convertFieldInfosToDataType(convertedPhysicalFieldInfos)
         );
 
-        return wrapCanalDeserializationSchema(canalSchema, includeMetadata, fieldInfos, timestampFormatStandard);
+        return wrapCanalDeserializationSchema(canalSchema, originPhysicalFieldInfos, convertedPhysicalFieldInfos);
     }
 
     private static DeserializationSchema<Row> wrapCanalDeserializationSchema(
             DeserializationSchema<RowData> canalSchema,
-            boolean includeMetadata,
-            FieldInfo[] origFieldInfos,
-            String timestampFormatStandard
-    ) throws IOException, ClassNotFoundException {
-        FieldInfo[] allFields;
-        if (includeMetadata) {
-            allFields = new FieldInfo[origFieldInfos.length + ALL_SUPPORTED_METADATA_KEYS.size()];
-            System.arraycopy(origFieldInfos, 0, allFields, 0, origFieldInfos.length);
-            FieldInfo[] metadataFields = buildMetadataFields(timestampFormatStandard);
-            System.arraycopy(metadataFields, 0, allFields, origFieldInfos.length, metadataFields.length);
-        } else {
-            allFields = origFieldInfos;
-        }
-
-        FieldInfo[] convertedAllFields = convertDateToStringFormatInfo(allFields);
-        RowDataToRowDeserializationSchemaWrapper rowDataToRowSchema =
-                new RowDataToRowDeserializationSchemaWrapper(canalSchema, convertedAllFields);
-        return new CustomDateFormatDeserializationSchemaWrapper(rowDataToRowSchema, extractFormatInfos(allFields));
-    }
-
-    private static CanalJsonDecodingFormat createCanalJsonDecodingFormat(
-            String database,
-            String table,
-            boolean ignoreParseErrors,
-            String timestampFormatStandard,
-            boolean includeMetadata
+            FieldInfo[] originPhysicalFieldInfos,
+            FieldInfo[] convertedPhysicalFieldInfos
     ) {
-        TimestampFormat timestampFormat = getTimestampFormatStandard(timestampFormatStandard);
-        CanalJsonDecodingFormat canalJsonDecodingFormat =
-                new CanalJsonDecodingFormat(database, table, ignoreParseErrors, timestampFormat);
-        if (includeMetadata) {
-            canalJsonDecodingFormat.applyReadableMetadata(ALL_SUPPORTED_METADATA_KEYS);
-        }
 
-        return canalJsonDecodingFormat;
+        RowDataToRowDeserializationSchemaWrapper rowDataToRowSchema = new RowDataToRowDeserializationSchemaWrapper(
+                canalSchema,
+                getProducedFieldInfos(convertedPhysicalFieldInfos));
+        return new CustomDateFormatDeserializationSchemaWrapper(
+                rowDataToRowSchema,
+                extractFormatInfos(getProducedFieldInfos(originPhysicalFieldInfos)));
     }
 
-    private static FieldInfo[] buildMetadataFields(String timestampStandard) {
-        return new FieldInfo[]{
-                new FieldInfo(ALL_SUPPORTED_METADATA_KEYS.get(0), StringFormatInfo.INSTANCE),
-                new FieldInfo(ALL_SUPPORTED_METADATA_KEYS.get(1), StringFormatInfo.INSTANCE),
-                new FieldInfo(ALL_SUPPORTED_METADATA_KEYS.get(2),
-                        new MapFormatInfo(StringFormatInfo.INSTANCE, IntFormatInfo.INSTANCE)),
-                new FieldInfo(ALL_SUPPORTED_METADATA_KEYS.get(3), new ArrayFormatInfo(StringFormatInfo.INSTANCE)),
-                new FieldInfo(ALL_SUPPORTED_METADATA_KEYS.get(4), new LocalZonedTimestampFormatInfo(timestampStandard)),
-                new FieldInfo(ALL_SUPPORTED_METADATA_KEYS.get(5), new LocalZonedTimestampFormatInfo(timestampStandard))
-        };
-    }
+    public static FieldInfo[] getMetadataFieldInfos(FieldInfo[] fieldInfos) {
+        List<FieldInfo> metadataFieldInfos = new ArrayList<>();
+        Arrays.stream(fieldInfos)
+                .filter(fieldInfo -> fieldInfo instanceof BuiltInFieldInfo)
+                .forEach(fieldInfo -> {
+                    BuiltInFieldInfo builtInFieldInfo = (BuiltInFieldInfo) fieldInfo;
+                    BuiltInField builtInField = builtInFieldInfo.getBuiltInField();
+                    switch (builtInField) {
+                        case MYSQL_METADATA_DATABASE:
+                            metadataFieldInfos.add(new FieldInfo(
+                                    ReadableMetadata.DATABASE.getKey(), StringFormatInfo.INSTANCE));
+                            break;
+                        case MYSQL_METADATA_TABLE:
+                            metadataFieldInfos.add(new FieldInfo(
+                                    ReadableMetadata.TABLE.getKey(), StringFormatInfo.INSTANCE));
+                            break;
+                        case MYSQL_METADATA_EVENT_TIME:
+                            metadataFieldInfos.add(new FieldInfo(
+                                    ReadableMetadata.EVENT_TIMESTAMP.getKey(), LongFormatInfo.INSTANCE));
+                            break;
+                        case MYSQL_METADATA_IS_DDL:
+                            metadataFieldInfos.add(new FieldInfo(
+                                    ReadableMetadata.IS_DDL.getKey(), BooleanFormatInfo.INSTANCE));
+                            break;
+                        case MYSQL_METADATA_EVENT_TYPE:
+                        case MYSQL_METADATA_DATA:
+                            break;
+                        default:
+                            throw new IllegalArgumentException(
+                                    "Unsupported builtin field '" + builtInField + "' in debezium deserialization");
+                    }
+                });
 
+        return metadataFieldInfos.toArray(new FieldInfo[0]);
+    }
 }
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DebeziumDeserializationSchemaBuilder.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DebeziumDeserializationSchemaBuilder.java
index 8027822..a2b64a3 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DebeziumDeserializationSchemaBuilder.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DebeziumDeserializationSchemaBuilder.java
@@ -23,6 +23,7 @@ import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.checkW
 import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertDateToStringFormatInfo;
 import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertFieldInfosToDataType;
 import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.extractFormatInfos;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.getProducedFieldInfos;
 import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.getTimestampFormatStandard;
 
 import java.io.IOException;
@@ -39,7 +40,6 @@ import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.Row;
 import org.apache.inlong.sort.formats.common.LongFormatInfo;
-import org.apache.inlong.sort.formats.common.MapFormatInfo;
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.apache.inlong.sort.formats.json.debezium.DebeziumJsonDecodingFormat;
 import org.apache.inlong.sort.formats.json.debezium.DebeziumJsonDecodingFormat.ReadableMetadata;
@@ -129,13 +129,4 @@ public class DebeziumDeserializationSchemaBuilder {
 
         return metadataFieldInfos.toArray(new FieldInfo[0]);
     }
-
-    public static FieldInfo[] getProducedFieldInfos(FieldInfo[] physicalFieldInfos) {
-        List<FieldInfo> results = new ArrayList<>();
-        results.add(new FieldInfo(
-                "metadata",
-                new MapFormatInfo(StringFormatInfo.INSTANCE, StringFormatInfo.INSTANCE)));
-        results.addAll(Arrays.asList(physicalFieldInfos));
-        return results.toArray(new FieldInfo[0]);
-    }
 }
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationSchemaFactory.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationSchemaFactory.java
index 1e71b0a..85ce0d3 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationSchemaFactory.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationSchemaFactory.java
@@ -51,7 +51,7 @@ public class DeserializationSchemaFactory {
             return buildAvroDeserializationSchema(extractNonBuiltInFieldInfos(fieldInfos, false));
         } else if (deserializationInfo instanceof CanalDeserializationInfo) {
             return CanalDeserializationSchemaBuilder.build(
-                    extractNonBuiltInFieldInfos(fieldInfos, false),
+                    fieldInfos,
                     (CanalDeserializationInfo) deserializationInfo);
         } else if (deserializationInfo instanceof DebeziumDeserializationInfo) {
             return DebeziumDeserializationSchemaBuilder.build(
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/FieldMappingTransformer.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/FieldMappingTransformer.java
index 9eb71b0..e827a8b 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/FieldMappingTransformer.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/FieldMappingTransformer.java
@@ -98,9 +98,11 @@ public class FieldMappingTransformer implements Serializable {
             case MYSQL_METADATA_TABLE:
                 return attributes.get(MysqlBinLogData.MYSQL_METADATA_TABLE);
             case MYSQL_METADATA_IS_DDL:
-                return BooleanFormatInfo.INSTANCE.deserialize(attributes.get(MysqlBinLogData.MYSQL_METADATA_IS_DDL));
+                String isDdlStr = attributes.get(MysqlBinLogData.MYSQL_METADATA_IS_DDL);
+                return isDdlStr == null ? null : BooleanFormatInfo.INSTANCE.deserialize(isDdlStr);
             case MYSQL_METADATA_EVENT_TIME:
-                return LongFormatInfo.INSTANCE.deserialize(attributes.get(MysqlBinLogData.MYSQL_METADATA_EVENT_TIME));
+                String eventTimeStr = attributes.get(MysqlBinLogData.MYSQL_METADATA_EVENT_TIME);
+                return eventTimeStr == null ? null : LongFormatInfo.INSTANCE.deserialize(eventTimeStr);
             case MYSQL_METADATA_EVENT_TYPE:
                 return kind.shortString();
             case MYSQL_METADATA_DATA:
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtils.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtils.java
index d169fbc..eac8154 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtils.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtils.java
@@ -18,7 +18,9 @@
 
 package org.apache.inlong.sort.singletenant.flink.utils;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 import org.apache.avro.Schema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.formats.common.TimestampFormat;
@@ -31,6 +33,7 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.inlong.sort.formats.base.TableFormatUtils;
 import org.apache.inlong.sort.formats.common.DateFormatInfo;
 import org.apache.inlong.sort.formats.common.FormatInfo;
+import org.apache.inlong.sort.formats.common.MapFormatInfo;
 import org.apache.inlong.sort.formats.common.RowFormatInfo;
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.apache.inlong.sort.formats.common.TimeFormatInfo;
@@ -214,4 +217,13 @@ public class CommonUtils {
         return false;
     }
 
+    public static FieldInfo[] getProducedFieldInfos(FieldInfo[] physicalFieldInfos) {
+        List<FieldInfo> results = new ArrayList<>();
+        results.add(new FieldInfo(
+                "metadata",
+                new MapFormatInfo(StringFormatInfo.INSTANCE, StringFormatInfo.INSTANCE)));
+        results.addAll(Arrays.asList(physicalFieldInfos));
+        return results.toArray(new FieldInfo[0]);
+    }
+
 }
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/CanalDeserializationTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/CanalDeserializationTest.java
index b649ef0..b46d1df 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/CanalDeserializationTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/CanalDeserializationTest.java
@@ -18,10 +18,21 @@
 
 package org.apache.inlong.sort.singletenant.flink.deserialization;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.types.Row;
 import org.apache.inlong.sort.formats.common.BinaryFormatInfo;
+import org.apache.inlong.sort.formats.common.BooleanFormatInfo;
 import org.apache.inlong.sort.formats.common.DateFormatInfo;
 import org.apache.inlong.sort.formats.common.FloatFormatInfo;
 import org.apache.inlong.sort.formats.common.IntFormatInfo;
@@ -30,23 +41,13 @@ import org.apache.inlong.sort.formats.common.MapFormatInfo;
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.apache.inlong.sort.formats.common.TimeFormatInfo;
 import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.formats.json.MysqlBinLogData;
+import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
+import org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.deserialization.CanalDeserializationInfo;
 import org.junit.Test;
 
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.time.LocalDateTime;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
 public class CanalDeserializationTest {
 
     private final FieldInfo[] fieldInfos = new FieldInfo[]{
@@ -62,27 +63,40 @@ public class CanalDeserializationTest {
                     new MapFormatInfo(StringFormatInfo.INSTANCE, IntFormatInfo.INSTANCE)))
     };
 
+    private final FieldInfo[] fieldInfosWithBuiltinField = new FieldInfo[]{
+            new FieldInfo("id", IntFormatInfo.INSTANCE),
+            new FieldInfo("name", StringFormatInfo.INSTANCE),
+            new FieldInfo("description", StringFormatInfo.INSTANCE),
+            new FieldInfo("weight", FloatFormatInfo.INSTANCE),
+            new BuiltInFieldInfo("database", StringFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_DATABASE),
+            new BuiltInFieldInfo("table", StringFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_TABLE),
+            new BuiltInFieldInfo(
+                    "event-timestamp", LongFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_EVENT_TIME),
+            new BuiltInFieldInfo("is-ddl", BooleanFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_IS_DDL),
+    };
+
     private Row generateTestRow() {
-        Row testRow = new Row(8);
-        testRow.setField(0, 1238123899121L);
-        testRow.setField(1, "testName");
+        Row testRow = new Row(9);
+        testRow.setField(0, new HashMap<>());
+        testRow.setField(1, 1238123899121L);
+        testRow.setField(2, "testName");
 
         byte[] bytes = new byte[]{1, 2, 3, 4, 5, 6};
-        testRow.setField(2, bytes);
+        testRow.setField(3, bytes);
 
-        testRow.setField(3, Date.valueOf("1990-10-14"));
-        testRow.setField(4, Time.valueOf("12:12:43"));
-        testRow.setField(5, Timestamp.valueOf("1990-10-14 12:12:43"));
+        testRow.setField(4, Date.valueOf("1990-10-14"));
+        testRow.setField(5, Time.valueOf("12:12:43"));
+        testRow.setField(6, Timestamp.valueOf("1990-10-14 12:12:43"));
 
         Map<String, Long> map = new HashMap<>();
         map.put("flink", 123L);
-        testRow.setField(6, map);
+        testRow.setField(7, map);
 
         Map<String, Map<String, Integer>> nestedMap = new HashMap<>();
         Map<String, Integer> innerMap = new HashMap<>();
         innerMap.put("key", 234);
         nestedMap.put("inner_map", innerMap);
-        testRow.setField(7, nestedMap);
+        testRow.setField(8, nestedMap);
 
         return testRow;
     }
@@ -170,40 +184,41 @@ public class CanalDeserializationTest {
                 + "}";
 
         byte[] testBytes = testCanalData.getBytes(StandardCharsets.UTF_8);
-        FieldInfo[] fieldInfos = new FieldInfo[]{
-                new FieldInfo("id", IntFormatInfo.INSTANCE),
-                new FieldInfo("name", StringFormatInfo.INSTANCE),
-                new FieldInfo("description", StringFormatInfo.INSTANCE),
-                new FieldInfo("weight", FloatFormatInfo.INSTANCE)
-        };
-
         DeserializationSchema<Row> schemaWithoutFilter = DeserializationSchemaFactory.build(
-                fieldInfos,
+                fieldInfosWithBuiltinField,
                 new CanalDeserializationInfo(null, null, false, "ISO_8601", true)
         );
         ListCollector<Row> collector1 = new ListCollector<>();
         schemaWithoutFilter.deserialize(testBytes, collector1);
         List<Row> innerList = collector1.getInnerList();
         assertEquals(3, innerList.size());
-        Row row = innerList.get(0);
-        assertEquals(101, row.getField(0));
-        assertEquals("scooter", row.getField(1).toString());
-        assertEquals("Small 2-wheel scooter", row.getField(2).toString());
-        assertEquals(3.14f, (Float) row.getField(3), 0);
-        assertEquals("inventory", row.getField(4).toString());
-        assertEquals("products2", row.getField(5).toString());
-        assertEquals(4, ((Map<?, ?>) row.getField(6)).size());
-        assertEquals("id", ((String[]) row.getField(7))[0]);
-        // "es" and "ts" are treated as local in flink-canal
-        assertEquals(1589373515477L, TimestampData.fromLocalDateTime((LocalDateTime) row.getField(8)).getMillisecond());
-        assertEquals(1589373515000L, TimestampData.fromLocalDateTime((LocalDateTime) row.getField(9)).getMillisecond());
+        assertEquals(generateTestRowWithMetadata(), innerList.get(0));
 
         DeserializationSchema<Row> schemaWithFilter = DeserializationSchemaFactory.build(
-                fieldInfos,
+                fieldInfosWithBuiltinField,
                 new CanalDeserializationInfo("NoExistDB", null, false, "ISO_8601", true)
         );
         ListCollector<Row> collector2 = new ListCollector<>();
         schemaWithFilter.deserialize(testBytes, collector2);
         assertTrue(collector2.getInnerList().isEmpty());
     }
+
+    private Row generateTestRowWithMetadata() {
+        Row testRow = new Row(5);
+        testRow.setField(0, new HashMap<String, String>() {
+            {
+                put(MysqlBinLogData.MYSQL_METADATA_DATABASE, "inventory");
+                put(MysqlBinLogData.MYSQL_METADATA_TABLE, "products2");
+                put(MysqlBinLogData.MYSQL_METADATA_EVENT_TIME, "1589373515000");
+                put(MysqlBinLogData.MYSQL_METADATA_IS_DDL, "false");
+            }
+        });
+
+        testRow.setField(1, 101);
+        testRow.setField(2, "scooter");
+        testRow.setField(3, "Small 2-wheel scooter");
+        testRow.setField(4, 3.14f);
+
+        return testRow;
+    }
 }
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationFunctionTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationFunctionTest.java
index a83c42b..e334df9 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationFunctionTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationFunctionTest.java
@@ -20,8 +20,15 @@ package org.apache.inlong.sort.singletenant.flink.deserialization;
 import org.apache.flink.types.Row;
 import org.apache.inlong.common.msg.InLongMsg;
 import org.apache.inlong.sort.configuration.Configuration;
+import org.apache.inlong.sort.formats.common.BooleanFormatInfo;
+import org.apache.inlong.sort.formats.common.FloatFormatInfo;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
+import org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField;
 import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.deserialization.CanalDeserializationInfo;
 import org.apache.inlong.sort.singletenant.flink.SerializedRecord;
 import org.junit.Test;
 
@@ -51,4 +58,89 @@ public class DeserializationFunctionTest {
         assertEquals(testData, row.getField(0));
     }
 
+    @Test
+    public void testDeserializeCanalJson() throws Exception {
+        String testCanalData = "{\n"
+                + "    \"data\":[\n"
+                + "        {\n"
+                + "            \"id\":\"101\",\n"
+                + "            \"name\":\"scooter\",\n"
+                + "            \"description\":\"Small 2-wheel scooter\",\n"
+                + "            \"weight\":\"3.14\"\n"
+                + "        },\n"
+                + "        {\n"
+                + "            \"id\":\"102\",\n"
+                + "            \"name\":\"car battery\",\n"
+                + "            \"description\":\"12V car battery\",\n"
+                + "            \"weight\":\"8.1\"\n"
+                + "        },\n"
+                + "        {\n"
+                + "            \"id\":\"103\",\n"
+                + "            \"name\":\"12-pack drill bits\",\n"
+                + "            \"description\":\"12-pack of drill bits with sizes ranging from #40 to #3\",\n"
+                + "            \"weight\":\"0.8\"\n"
+                + "        }\n"
+                + "    ],\n"
+                + "    \"database\":\"database\",\n"
+                + "    \"es\":1589373515000,\n"
+                + "    \"id\":3,\n"
+                + "    \"isDdl\":false,\n"
+                + "    \"mysqlType\":{\n"
+                + "        \"id\":\"INTEGER\",\n"
+                + "        \"name\":\"VARCHAR(255)\",\n"
+                + "        \"description\":\"VARCHAR(512)\",\n"
+                + "        \"weight\":\"FLOAT\"\n"
+                + "    },\n"
+                + "    \"old\":null,\n"
+                + "    \"pkNames\":[\n"
+                + "        \"id\"\n"
+                + "    ],\n"
+                + "    \"sql\":\"\",\n"
+                + "    \"sqlType\":{\n"
+                + "        \"id\":4,\n"
+                + "        \"name\":12,\n"
+                + "        \"description\":12,\n"
+                + "        \"weight\":7\n"
+                + "    },\n"
+                + "    \"table\":\"table\",\n"
+                + "    \"ts\":1589373515477,\n"
+                + "    \"type\":\"INSERT\"\n"
+                + "}";
+
+        InLongMsg inLongMsg = InLongMsg.newInLongMsg(true);
+        String attrs = "m=0"
+                + "&dt=" + System.currentTimeMillis()
+                + "&iname=" + "tid";
+        inLongMsg.addMsg(attrs, testCanalData.getBytes());
+        SerializedRecord serializedRecord = new SerializedRecord(1, inLongMsg.buildArray());
+
+        FieldInfo[] fieldInfos = new FieldInfo[]{
+                new FieldInfo("id", IntFormatInfo.INSTANCE),
+                new FieldInfo("name", StringFormatInfo.INSTANCE),
+                new FieldInfo("description", StringFormatInfo.INSTANCE),
+                new FieldInfo("weight", FloatFormatInfo.INSTANCE),
+                new BuiltInFieldInfo("database", StringFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_DATABASE),
+                new BuiltInFieldInfo("table", StringFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_TABLE),
+                new BuiltInFieldInfo(
+                        "event-timestamp", LongFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_EVENT_TIME),
+                new BuiltInFieldInfo("is-ddl", BooleanFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_IS_DDL),
+                new BuiltInFieldInfo("is-ddl", BooleanFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_EVENT_TYPE)
+        };
+
+        DeserializationFunction function = new DeserializationFunction(
+                DeserializationSchemaFactory.build(
+                        fieldInfos,
+                        new CanalDeserializationInfo(null, null, false, "ISO_8601", false)),
+                new FieldMappingTransformer(new Configuration(), fieldInfos),
+                false);
+
+        ListCollector<Row> collector = new ListCollector<>();
+        function.processElement(serializedRecord,null, collector);
+        Row row = collector.getInnerList().get(0);
+
+        Row expected = Row.of(
+                101, "scooter", "Small 2-wheel scooter", 3.14f, "database", "table", 1589373515000L, false, "+I");
+        assertEquals(expected, row);
+    }
+
 }