You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/14 11:00:43 UTC

[incubator-inlong] branch master updated: [INLONG-3112][Manager] Support source metadata field for Sort (#3118)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b7f6fc4  [INLONG-3112][Manager] Support source metadata field for Sort (#3118)
b7f6fc4 is described below

commit b7f6fc47054ce236ab85479b005fa2d19e38ae76
Author: yunqingmoswu <44...@users.noreply.github.com>
AuthorDate: Mon Mar 14 19:00:38 2022 +0800

    [INLONG-3112][Manager] Support source metadata field for Sort (#3118)
---
 .../inlong/manager/client/api/SinkField.java       |  6 ++-
 .../manager/client/api/source/KafkaSource.java     | 13 ++++-
 .../client/api/util/InlongStreamSinkTransfer.java  |  4 +-
 .../api/util/InlongStreamSourceTransfer.java       | 10 +++-
 .../inlong/manager/common/enums/MetaFieldType.java | 63 ++++++++++++++++++++++
 .../manager/common/pojo/sink/SinkFieldRequest.java |  3 ++
 .../common/pojo/sink/SinkFieldResponse.java        |  3 ++
 .../common/pojo/source/kafka/KafkaSourceDTO.java   | 16 ++++++
 .../pojo/source/kafka/KafkaSourceRequest.java      | 12 +++++
 .../pojo/source/kafka/KafkaSourceResponse.java     | 12 +++++
 .../manager/dao/entity/StreamSinkFieldEntity.java  |  1 +
 .../mappers/StreamSinkFieldEntityMapper.xml        | 23 +++++---
 .../thirdparty/sort/util/FieldInfoUtils.java       | 51 ++++++++++++++----
 .../thirdparty/sort/util/SerializationUtils.java   |  4 ++
 .../main/resources/sql/apache_inlong_manager.sql   |  1 +
 .../manager-web/sql/apache_inlong_manager.sql      |  1 +
 16 files changed, 200 insertions(+), 23 deletions(-)

diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SinkField.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SinkField.java
index 7a0b3e3..3dee543 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SinkField.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SinkField.java
@@ -33,10 +33,14 @@ public class SinkField extends StreamField {
     @ApiModelProperty("Source field type")
     private String sourceFieldType;
 
+    @ApiModelProperty("Is source meta field, 0: no, 1: yes")
+    private Integer isSourceMetaField = 0;
+
     public SinkField(int index, FieldType fieldType, String fieldName, String fieldComment,
-            String fieldValue, String sourceFieldName, String sourceFieldType) {
+            String fieldValue, String sourceFieldName, String sourceFieldType, Integer isSourceMetaField) {
         super(index, fieldType, fieldName, fieldComment, fieldValue);
         this.sourceFieldName = sourceFieldName;
         this.sourceFieldType = sourceFieldType;
+        this.isSourceMetaField = isSourceMetaField;
     }
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/KafkaSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/KafkaSource.java
index 1e64253..61a4faf 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/KafkaSource.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/KafkaSource.java
@@ -24,8 +24,8 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.client.api.DataFormat;
-import org.apache.inlong.manager.client.api.StreamSource;
 import org.apache.inlong.manager.client.api.KafkaOffset;
+import org.apache.inlong.manager.client.api.StreamSource;
 import org.apache.inlong.manager.common.enums.SourceType;
 
 @Data
@@ -68,4 +68,15 @@ public class KafkaSource extends StreamSource {
     @ApiModelProperty(value = "The strategy of auto offset reset")
     private KafkaOffset autoOffsetReset;
 
+    @ApiModelProperty("database pattern used for filter in canal format")
+    private String databasePattern;
+
+    @ApiModelProperty("table pattern used for filter in canal format")
+    private String tablePattern;
+
+    @ApiModelProperty("ignore parse errors, true: ignore parse error; false: not ignore parse error; default true")
+    private boolean ignoreParseErrors = true;
+
+    @ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
+    private String timestampFormatStandard = "SQL";
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
index 8c517c3..8214342 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
@@ -152,7 +152,8 @@ public class InlongStreamSinkTransfer {
                 sinkFieldResponse.getFieldName(),
                 sinkFieldResponse.getFieldComment(),
                 null, sinkFieldResponse.getSourceFieldName(),
-                sinkFieldResponse.getSourceFieldType())).collect(Collectors.toList());
+                sinkFieldResponse.getSourceFieldType(),
+                sinkFieldResponse.getIsSourceMetaField())).collect(Collectors.toList());
 
     }
 
@@ -239,6 +240,7 @@ public class InlongStreamSinkTransfer {
             sinkFieldRequest.setFieldComment(sinkField.getFieldComment());
             sinkFieldRequest.setSourceFieldName(sinkField.getSourceFieldName());
             sinkFieldRequest.setSourceFieldType(sinkField.getSourceFieldType());
+            sinkFieldRequest.setIsSourceMetaField(sinkField.getIsSourceMetaField());
             sinkFieldRequests.add(sinkFieldRequest);
         }
         return sinkFieldRequests;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
index 9b58ddc..0294a79 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
@@ -21,12 +21,12 @@ import com.google.common.base.Joiner;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.client.api.DataFormat;
+import org.apache.inlong.manager.client.api.KafkaOffset;
 import org.apache.inlong.manager.client.api.StreamSource;
 import org.apache.inlong.manager.client.api.StreamSource.SyncType;
 import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
 import org.apache.inlong.manager.client.api.source.KafkaSource;
 import org.apache.inlong.manager.client.api.source.MySQLBinlogSource;
-import org.apache.inlong.manager.client.api.KafkaOffset;
 import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
 import org.apache.inlong.manager.common.pojo.source.SourceRequest;
@@ -95,6 +95,10 @@ public class InlongStreamSourceTransfer {
         kafkaSource.setTopicPartitionOffset(kafkaSourceResponse.getTopicPartitionOffset());
         kafkaSource.setRecordSpeedLimit(kafkaSourceResponse.getRecordSpeedLimit());
         kafkaSource.setSyncType(SyncType.FULL);
+        kafkaSource.setDatabasePattern(kafkaSourceResponse.getDatabasePattern());
+        kafkaSource.setTablePattern(kafkaSourceResponse.getTablePattern());
+        kafkaSource.setIgnoreParseErrors(kafkaSourceResponse.isIgnoreParseErrors());
+        kafkaSource.setTimestampFormatStandard(kafkaSourceResponse.getTimestampFormatStandard());
         return kafkaSource;
     }
 
@@ -183,6 +187,10 @@ public class InlongStreamSourceTransfer {
         sourceRequest.setAutoOffsetReset(kafkaSource.getAutoOffsetReset().getName());
         sourceRequest.setGroupId(kafkaSource.getConsumerGroup());
         sourceRequest.setSerializationType(kafkaSource.getDataFormat().getName());
+        sourceRequest.setDatabasePattern(kafkaSource.getDatabasePattern());
+        sourceRequest.setTablePattern(kafkaSource.getTablePattern());
+        sourceRequest.setIgnoreParseErrors(kafkaSource.isIgnoreParseErrors());
+        sourceRequest.setTimestampFormatStandard(kafkaSource.getTimestampFormatStandard());
         return sourceRequest;
     }
 
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MetaFieldType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MetaFieldType.java
new file mode 100644
index 0000000..846ceac
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MetaFieldType.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.enums;
+
+import lombok.Getter;
+
+@Getter
+public enum MetaFieldType {
+
+    /**
+     * database
+     */
+    DATABASE("database", "meta field database used in canal json or mysql binlong and so on"),
+
+    /**
+     * data_time
+     */
+    DATA_TIME("data_time", "meta field data_time used in canal json or mysql binlong and so on"),
+
+    /**
+     * table
+     */
+    TABLE("table", "meta field table used in canal json or mysql binlong and so on"),
+
+    /**
+     * event_time
+     */
+    EVENT_TIME("event_time", "meta field event_time used in canal json or mysql binlong and so on"),
+
+    /**
+     * is_ddl
+     */
+    IS_DDL("is_ddl", "meta field is_ddl used in canal json or mysql binlong and so on"),
+
+    /**
+     * event_type
+     */
+    EVENT_TYPE("event_type", "meta field event_type used in canal json or mysql binlong and so on");
+
+    private final String name;
+
+    private final String description;
+
+    MetaFieldType(String name, String description) {
+        this.name = name;
+        this.description = description;
+    }
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldRequest.java
index c3aceb1..8efb7cf 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldRequest.java
@@ -46,6 +46,9 @@ public class SinkFieldRequest {
     @ApiModelProperty("Source field type")
     private String sourceFieldType;
 
+    @ApiModelProperty("Is source meta field, 0: no, 1: yes")
+    private Integer isSourceMetaField = 0;
+
     @ApiModelProperty("Field order")
     private Short rankNum;
 
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldResponse.java
index cdaffe8..773d476 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldResponse.java
@@ -51,6 +51,9 @@ public class SinkFieldResponse {
     @ApiModelProperty("Source field type")
     private String sourceFieldType;
 
+    @ApiModelProperty("Is source meta field, 0: no, 1: yes")
+    private Integer isSourceMetaField = 0;
+
     @ApiModelProperty("Field order")
     private Short rankNum;
 
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
index f4fb9bb..2c5c28f 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
@@ -71,6 +71,18 @@ public class KafkaSourceDTO {
     @ApiModelProperty("Data Serialization, support: json, canal, avro, etc")
     private String serializationType;
 
+    @ApiModelProperty("database pattern used for filter in canal format")
+    private String databasePattern;
+
+    @ApiModelProperty("table pattern used for filter in canal format")
+    private String tablePattern;
+
+    @ApiModelProperty("ignore parse errors, true: ignore parse error; false: not ignore parse error; default true")
+    private boolean ignoreParseErrors;
+
+    @ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
+    private String timestampFormatStandard;
+
     /**
      * Get the dto instance from the request
      */
@@ -84,6 +96,10 @@ public class KafkaSourceDTO {
                 .topicPartitionOffset(request.getTopicPartitionOffset())
                 .autoOffsetReset(request.getAutoOffsetReset())
                 .serializationType(request.getSerializationType())
+                .databasePattern(request.getDatabasePattern())
+                .tablePattern(request.getTablePattern())
+                .ignoreParseErrors(request.isIgnoreParseErrors())
+                .timestampFormatStandard(request.getTimestampFormatStandard())
                 .build();
     }
 
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java
index 84110d1..0b6a230 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java
@@ -62,6 +62,18 @@ public class KafkaSourceRequest extends SourceRequest {
             notes = "including earliest, latest (the default), none")
     private String autoOffsetReset;
 
+    @ApiModelProperty("database pattern used for filter in canal format")
+    private String databasePattern;
+
+    @ApiModelProperty("table pattern used for filter in canal format")
+    private String tablePattern;
+
+    @ApiModelProperty("ignore parse errors, true: ignore parse error; false: not ignore parse error; default true")
+    private boolean ignoreParseErrors = true;
+
+    @ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
+    private String timestampFormatStandard = "SQL";
+
     public KafkaSourceRequest() {
         this.setSourceType(SourceType.KAFKA.toString());
     }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceResponse.java
index 34b178b..3055db1 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceResponse.java
@@ -55,6 +55,18 @@ public class KafkaSourceResponse extends SourceResponse {
     @ApiModelProperty(value = "The strategy of auto offset reset")
     private String autoOffsetReset;
 
+    @ApiModelProperty("database pattern used for filter in canal format")
+    private String databasePattern;
+
+    @ApiModelProperty("table pattern used for filter in canal format")
+    private String tablePattern;
+
+    @ApiModelProperty("ignore parse errors, true: ignore parse error; false: not ignore parse error; default true")
+    private boolean ignoreParseErrors;
+
+    @ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
+    private String timestampFormatStandard;
+
     public KafkaSourceResponse() {
         this.setSourceType(SourceType.KAFKA.name());
     }
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java
index 91f988a..98f888f 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java
@@ -37,6 +37,7 @@ public class StreamSinkFieldEntity implements Serializable {
     private Integer isRequired;
     private String sourceFieldName;
     private String sourceFieldType;
+    private Integer isSourceMetaField;
     private Short rankNum;
     private Integer isDeleted;
 
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
index d647045..a1fd0bc 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
@@ -18,7 +18,8 @@
     under the License.
 -->
 
-<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
+        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
 <mapper namespace="org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper">
     <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity">
         <id column="id" jdbcType="INTEGER" property="id"/>
@@ -31,25 +32,30 @@
         <result column="field_comment" jdbcType="VARCHAR" property="fieldComment"/>
         <result column="source_field_name" jdbcType="VARCHAR" property="sourceFieldName"/>
         <result column="source_field_type" jdbcType="VARCHAR" property="sourceFieldType"/>
+        <result column="is_source_meta_field" jdbcType="INTEGER" property="isSourceMetaField"/>
         <result column="rank_num" jdbcType="SMALLINT" property="rankNum"/>
         <result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
     </resultMap>
     <sql id="Base_Column_List">
-        id, sink_id, field_name, field_type, field_comment,
-        source_field_name, source_field_type, rank_num, is_deleted
+        id
+        , sink_id, field_name, field_type, field_comment,
+        source_field_name, source_field_type, is_source_meta_field, rank_num, is_deleted
     </sql>
 
-    <insert id="insert"  useGeneratedKeys="true" keyProperty="id"
+    <insert id="insert" useGeneratedKeys="true" keyProperty="id"
             parameterType="org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity">
         insert into stream_sink_field (id, inlong_group_id, inlong_stream_id,
                                        sink_id, sink_type,
                                        field_name, field_type, field_comment,
-                                       source_field_name, source_field_type,
+                                       source_field_name, source_field_type, is_source_meta_field,
                                        rank_num, is_deleted)
-        values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
+        values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR},
+                #{inlongStreamId,jdbcType=VARCHAR},
                 #{sinkId,jdbcType=INTEGER}, #{sinkType,jdbcType=VARCHAR},
-                #{fieldName,jdbcType=VARCHAR}, #{fieldType,jdbcType=VARCHAR}, #{fieldComment,jdbcType=VARCHAR},
+                #{fieldName,jdbcType=VARCHAR}, #{fieldType,jdbcType=VARCHAR},
+                #{fieldComment,jdbcType=VARCHAR},
                 #{sourceFieldName,jdbcType=VARCHAR}, #{sourceFieldType,jdbcType=VARCHAR},
+                #{isSourceMetaField,jdbcType=Integer},
                 #{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER})
     </insert>
     <insert id="insertAll">
@@ -58,7 +64,7 @@
         inlong_stream_id, sink_id,
         sink_type, field_name,
         field_type, field_comment,
-        source_field_name, source_field_type,
+        source_field_name, source_field_type,is_source_meta_field,
         rank_num, is_deleted
         )
         values
@@ -69,6 +75,7 @@
             #{item.sinkType,jdbcType=VARCHAR}, #{item.fieldName,jdbcType=VARCHAR},
             #{item.fieldType,jdbcType=VARCHAR}, #{item.fieldComment,jdbcType=VARCHAR},
             #{item.sourceFieldName,jdbcType=VARCHAR}, #{item.sourceFieldType,jdbcType=VARCHAR},
+            #{isSourceMetaField,jdbcType=Integer},
             #{item.rankNum,jdbcType=SMALLINT}, #{item.isDeleted,jdbcType=INTEGER}
             )
         </foreach>
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/FieldInfoUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/FieldInfoUtils.java
index 0bb4a7d..b7de742 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/FieldInfoUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/FieldInfoUtils.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.manager.service.thirdparty.sort.util;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.enums.MetaFieldType;
 import org.apache.inlong.manager.common.pojo.sink.SinkFieldResponse;
 import org.apache.inlong.sort.formats.common.ArrayFormatInfo;
 import org.apache.inlong.sort.formats.common.BooleanFormatInfo;
@@ -56,12 +57,12 @@ public class FieldInfoUtils {
     public static final Map<String, BuiltInField> BUILT_IN_FIELD_MAP = new HashMap<>();
 
     static {
-        BUILT_IN_FIELD_MAP.put("data_time", BuiltInField.DATA_TIME);
-        BUILT_IN_FIELD_MAP.put("database", BuiltInField.MYSQL_METADATA_DATABASE);
-        BUILT_IN_FIELD_MAP.put("table", BuiltInField.MYSQL_METADATA_TABLE);
-        BUILT_IN_FIELD_MAP.put("event_time", BuiltInField.MYSQL_METADATA_EVENT_TIME);
-        BUILT_IN_FIELD_MAP.put("is_ddl", BuiltInField.MYSQL_METADATA_IS_DDL);
-        BUILT_IN_FIELD_MAP.put("event_type", BuiltInField.MYSQL_METADATA_EVENT_TYPE);
+        BUILT_IN_FIELD_MAP.put(MetaFieldType.DATA_TIME.getName(), BuiltInField.DATA_TIME);
+        BUILT_IN_FIELD_MAP.put(MetaFieldType.DATABASE.getName(), BuiltInField.MYSQL_METADATA_DATABASE);
+        BUILT_IN_FIELD_MAP.put(MetaFieldType.TABLE.getName(), BuiltInField.MYSQL_METADATA_TABLE);
+        BUILT_IN_FIELD_MAP.put(MetaFieldType.EVENT_TIME.getName(), BuiltInField.MYSQL_METADATA_EVENT_TIME);
+        BUILT_IN_FIELD_MAP.put(MetaFieldType.IS_DDL.getName(), BuiltInField.MYSQL_METADATA_IS_DDL);
+        BUILT_IN_FIELD_MAP.put(MetaFieldType.EVENT_TYPE.getName(), BuiltInField.MYSQL_METADATA_EVENT_TYPE);
     }
 
     /**
@@ -78,7 +79,8 @@ public class FieldInfoUtils {
             boolean duplicate = false;
             for (SinkFieldResponse field : fieldList) {
                 // If the field name equals to build-in field, new a build-in field info
-                FieldInfo sourceFieldInfo = getFieldInfo(field.getSourceFieldName(), field.getSourceFieldType());
+                FieldInfo sourceFieldInfo = getFieldInfo(field.getSourceFieldName(),
+                        field.getSourceFieldType(), field.getIsSourceMetaField() == 1);
                 sourceFields.add(sourceFieldInfo);
 
                 // Get sink field info
@@ -86,7 +88,8 @@ public class FieldInfoUtils {
                 if (sinkFieldName.equals(partitionField)) {
                     duplicate = true;
                 }
-                FieldInfo sinkFieldInfo = getFieldInfo(field.getFieldName(), field.getFieldType());
+                FieldInfo sinkFieldInfo = getSinkFieldInfo(field.getFieldName(), field.getFieldType(),
+                        field.getSourceFieldName(), field.getIsSourceMetaField() == 1);
                 sinkFields.add(sinkFieldInfo);
 
                 fieldMappingUnitList.add(new FieldMappingUnit(sourceFieldInfo, sinkFieldInfo));
@@ -108,15 +111,41 @@ public class FieldInfoUtils {
      * @apiNote If the field name equals to build-in field, new a build-in field info
      */
     private static FieldInfo getFieldInfo(String fieldName, String fieldType) {
+        return getFieldInfo(fieldName, fieldType, false);
+    }
+
+    /**
+     * Get field info by the given field name ant type.
+     *
+     * @apiNote If the field name equals to build-in field, new a build-in field info
+     */
+    private static FieldInfo getFieldInfo(String fieldName, String fieldType, boolean isBuiltin) {
         FieldInfo fieldInfo;
         BuiltInField builtInField = BUILT_IN_FIELD_MAP.get(fieldName);
         FormatInfo formatInfo = convertFieldFormat(fieldType.toLowerCase());
-        if (builtInField == null) {
-            fieldInfo = new FieldInfo(fieldName, formatInfo);
-        } else {
+        if (isBuiltin && builtInField != null) {
             fieldInfo = new BuiltInFieldInfo(fieldName, formatInfo, builtInField);
+        } else {
+            fieldInfo = new FieldInfo(fieldName, formatInfo);
         }
+        return fieldInfo;
+    }
 
+    /**
+     * Get field info by the given field name ant type.
+     *
+     * @apiNote If the field name equals to build-in field, new a build-in field info
+     */
+    private static FieldInfo getSinkFieldInfo(String fieldName, String fieldType,
+            String sourceFieldName, boolean isBuiltin) {
+        FieldInfo fieldInfo;
+        BuiltInField builtInField = BUILT_IN_FIELD_MAP.get(sourceFieldName);
+        FormatInfo formatInfo = convertFieldFormat(fieldType.toLowerCase());
+        if (isBuiltin && builtInField != null) {
+            fieldInfo = new BuiltInFieldInfo(fieldName, formatInfo, builtInField);
+        } else {
+            fieldInfo = new FieldInfo(fieldName, formatInfo);
+        }
         return fieldInfo;
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java
index d0da64e..cea3110 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java
@@ -27,6 +27,7 @@ import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceResponse;
 import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.sort.protocol.deserialization.AvroDeserializationInfo;
+import org.apache.inlong.sort.protocol.deserialization.CanalDeserializationInfo;
 import org.apache.inlong.sort.protocol.deserialization.CsvDeserializationInfo;
 import org.apache.inlong.sort.protocol.deserialization.DebeziumDeserializationInfo;
 import org.apache.inlong.sort.protocol.deserialization.DeserializationInfo;
@@ -97,6 +98,9 @@ public class SerializationUtils {
                 return new AvroDeserializationInfo();
             case JSON:
                 return new JsonDeserializationInfo();
+            case CANAL:
+                return new CanalDeserializationInfo(source.getDatabasePattern(), source.getTablePattern(),
+                        source.isIgnoreParseErrors(), source.getTimestampFormatStandard(), true);
             default:
                 throw new IllegalArgumentException(
                         String.format("Unsupported serializationType for Kafka source: %s", serializationType));
diff --git a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
index b83f493..00272a6 100644
--- a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
@@ -641,6 +641,7 @@ CREATE TABLE `stream_sink_field`
     `sink_type`         varchar(15)  NOT NULL COMMENT 'Sink type',
     `source_field_name` varchar(50)   DEFAULT NULL COMMENT 'Source field name',
     `source_field_type` varchar(50)   DEFAULT NULL COMMENT 'Source field type',
+    `is_source_meta_field` int(3)     DEFAULT '0' COMMENT 'source field is meta field',
     `field_name`        varchar(50)  NOT NULL COMMENT 'Field name',
     `field_type`        varchar(50)  NOT NULL COMMENT 'Field type',
     `field_comment`     varchar(2000) DEFAULT NULL COMMENT 'Field description',
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index e3746c1..e41a53b 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -673,6 +673,7 @@ CREATE TABLE `stream_sink_field`
     `sink_type`         varchar(15)  NOT NULL COMMENT 'Sink type',
     `source_field_name` varchar(50)   DEFAULT NULL COMMENT 'Source field name',
     `source_field_type` varchar(50)   DEFAULT NULL COMMENT 'Source field type',
+    `is_source_meta_field` int(3)     DEFAULT '0' COMMENT 'source field is meta field',
     `field_name`        varchar(50)  NOT NULL COMMENT 'Field name',
     `field_type`        varchar(50)  NOT NULL COMMENT 'Field type',
     `field_comment`     varchar(2000) DEFAULT NULL COMMENT 'Field description',