You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/06/01 06:02:59 UTC

[incubator-inlong] branch master updated: [INLONG-4458][Sort][Manager] Unify the meta field naming (#4460)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 87709bb70 [INLONG-4458][Sort][Manager] Unify the meta field naming (#4460)
87709bb70 is described below

commit 87709bb706222620f40c66d7db44fe4ad7c529b1
Author: yunqingmoswu <44...@users.noreply.github.com>
AuthorDate: Wed Jun 1 14:02:55 2022 +0800

    [INLONG-4458][Sort][Manager] Unify the meta field naming (#4460)
---
 .../manager/client/api/impl/InlongStreamImpl.java  |   3 +-
 .../inlong/manager/common/enums/MetaFieldType.java | 103 -----------
 .../manager/common/pojo/stream/StreamField.java    |  20 +-
 .../dao/entity/InlongStreamFieldEntity.java        |   1 +
 .../manager/dao/entity/StreamSinkFieldEntity.java  |   1 +
 .../dao/entity/StreamSourceFieldEntity.java        |   2 +
 .../dao/entity/StreamTransformFieldEntity.java     |   2 +
 .../mappers/InlongStreamFieldEntityMapper.xml      |  19 +-
 .../mappers/StreamSinkFieldEntityMapper.xml        |  17 +-
 .../mappers/StreamSourceFieldEntityMapper.xml      |  51 ++++--
 .../mappers/StreamTransformFieldEntityMapper.xml   | 110 ++++++-----
 .../manager/service/sort/util/FieldInfoUtils.java  |  93 +++-------
 .../main/resources/sql/apache_inlong_manager.sql   |   4 +
 .../manager-web/sql/apache_inlong_manager.sql      |   4 +
 .../inlong/sort/protocol/BuiltInFieldInfo.java     | 173 ------------------
 .../org/apache/inlong/sort/protocol/FieldInfo.java |   5 +-
 .../apache/inlong/sort/protocol/MetaFieldInfo.java |  11 +-
 .../protocol/transformation/FunctionParam.java     |   4 +-
 .../inlong/sort/protocol/BuiltInFieldInfoTest.java |  32 ----
 .../apache/inlong/sort/protocol/FieldInfoTest.java |   2 +-
 .../inlong/sort/parser/impl/FlinkSqlParser.java    | 202 ---------------------
 .../apache/inlong/sort/parser/AllMigrateTest.java  |   8 +-
 .../sort/parser/DistinctNodeSqlParseTest.java      |  11 +-
 .../inlong/sort/parser/MetaFieldSyncTest.java      | 138 +++++---------
 24 files changed, 233 insertions(+), 783 deletions(-)

diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
index cc3e72cf6..ea1770e0a 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
@@ -85,7 +85,8 @@ public class InlongStreamImpl implements InlongStream {
                                     fieldInfo.getFieldComment(),
                                     fieldInfo.getFieldValue(),
                                     fieldInfo.getIsMetaField(),
-                                    fieldInfo.getFieldFormat()
+                                    fieldInfo.getMetaFieldName(),
+                                    fieldInfo.getOriginNodeName()
                             )
                     ).collect(Collectors.toList());
         }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MetaFieldType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MetaFieldType.java
deleted file mode 100644
index 3a52e41a0..000000000
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MetaFieldType.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.common.enums;
-
-import lombok.Getter;
-
-@Getter
-public enum MetaFieldType {
-
-    /**
-     * database
-     */
-    DATABASE("database", "meta field database used in canal json or mysql binlog and so on"),
-
-    /**
-     * processing_time
-     */
-    PROCESSING_TIME("processing_time", "meta field processing_time describe such moment the record be processed"),
-
-    /**
-     * data_time
-     */
-    DATA_TIME("data_time", "meta field data_time used in canal json or mysql binlog and so on"),
-
-    /**
-     * table
-     */
-    TABLE("table", "meta field table used in canal json or mysql binlog and so on"),
-
-    /**
-     * event_time
-     */
-    EVENT_TIME("event_time", "meta field event_time used in canal json or mysql binlog and so on"),
-
-    /**
-     * is_ddl
-     */
-    IS_DDL("is_ddl", "meta field is_ddl used in canal json or mysql binlog and so on"),
-
-    /**
-     * event_type
-     */
-    EVENT_TYPE("event_type", "meta field event_type used in canal json or mysql binlog and so on"),
-
-    /**
-     * data
-     */
-    MYSQL_DATA("data", "MySQL binlog data Row"),
-
-    /**
-     * update_before
-     */
-    UPDATE_BEFORE("update_before", "The value of the field before update"),
-
-    /**
-     * Batch id of binlog
-     */
-    BATCH_ID("batch_id", "Batch id of binlog"),
-
-    /**
-     * sql_type
-     */
-    SQL_TYPE("sql_type", "Mapping of sql_type table fields to java data type IDs"),
-
-    /**
-     * ts
-     */
-    TS("ts", "The current time when the ROW was received and processed"),
-
-    /**
-     * mysql_type
-     */
-    MYSQL_TYPE("mysql_type", "The table structure"),
-
-    /**
-     * pk_names
-     */
-    PK_NAMES("pk_names", "Primary key field name");
-
-    private final String name;
-
-    private final String description;
-
-    MetaFieldType(String name, String description) {
-        this.name = name;
-        this.description = description;
-    }
-}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamField.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamField.java
index 02a5e9358..6d386f71b 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamField.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamField.java
@@ -22,8 +22,6 @@ import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
-import org.apache.inlong.manager.common.enums.FieldType;
-import org.apache.inlong.manager.common.enums.MetaFieldType;
 
 /**
  * Stream filed, including field name, field type, etc.
@@ -34,12 +32,6 @@ import org.apache.inlong.manager.common.enums.MetaFieldType;
 @ApiModel("Stream field configuration")
 public class StreamField {
 
-    public static final StreamField PROCESSING_TIME = new StreamField(
-            100,
-            FieldType.BIGINT.toString(),
-            MetaFieldType.PROCESSING_TIME.getName(),
-            null, null, 1);
-
     @ApiModelProperty("Field index")
     private Integer id;
 
@@ -70,6 +62,9 @@ public class StreamField {
     @ApiModelProperty("Is this field a meta field, 0: no, 1: yes")
     private Integer isMetaField = 0;
 
+    @ApiModelProperty(value = "Meta field name")
+    private String metaFieldName;
+
     @ApiModelProperty("Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601"
             + " and custom such as 'yyyy-MM-dd HH:mm:ss'. This is mainly used for time format")
     private String fieldFormat;
@@ -92,22 +87,25 @@ public class StreamField {
     }
 
     public StreamField(int index, String fieldType, String fieldName, String fieldComment, String fieldValue,
-            Integer isMetaField) {
+            Integer isMetaField, String metaFieldName) {
         this(index, fieldType, fieldName, fieldComment, fieldValue);
         this.isMetaField = isMetaField;
+        this.metaFieldName = metaFieldName;
     }
 
     public StreamField(int index, String fieldType, String fieldName, String fieldComment, String fieldValue,
-            Integer isMetaField, String originNodeName) {
+            Integer isMetaField, String metaFieldName, String originNodeName) {
         this(index, fieldType, fieldName, fieldComment, fieldValue);
         this.isMetaField = isMetaField;
+        this.metaFieldName = metaFieldName;
         this.originNodeName = originNodeName;
     }
 
     public StreamField(int index, String fieldType, String fieldName, String fieldComment, String fieldValue,
-            Integer isMetaField, String originNodeName, String originFieldName) {
+            Integer isMetaField, String metaFieldName, String originNodeName, String originFieldName) {
         this(index, fieldType, fieldName, fieldComment, fieldValue);
         this.isMetaField = isMetaField;
+        this.metaFieldName = metaFieldName;
         this.originNodeName = originNodeName;
         this.originFieldName = originFieldName;
     }
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongStreamFieldEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongStreamFieldEntity.java
index e2a1f8ecb..ffd3fff9b 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongStreamFieldEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongStreamFieldEntity.java
@@ -38,6 +38,7 @@ public class InlongStreamFieldEntity implements Serializable {
     private String fieldType;
     private String fieldComment;
     private Integer isMetaField;
+    private String metaFieldName;
     private String fieldFormat;
     private Short rankNum;
     private Integer isDeleted;
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java
index 1482c4f07..31e9a9275 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java
@@ -43,6 +43,7 @@ public class StreamSinkFieldEntity implements Serializable {
     private String extParams;
 
     private Integer isMetaField;
+    private String metaFieldName;
     private String fieldFormat;
     private Short rankNum;
     private Integer isDeleted;
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceFieldEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceFieldEntity.java
index 665812386..5a766936c 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceFieldEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceFieldEntity.java
@@ -48,6 +48,8 @@ public class StreamSourceFieldEntity implements Serializable {
 
     private Integer isMetaField;
 
+    private String metaFieldName;
+
     private String fieldFormat;
 
     private Integer rankNum;
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamTransformFieldEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamTransformFieldEntity.java
index 49ed97fa1..aa9d1d5d6 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamTransformFieldEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamTransformFieldEntity.java
@@ -49,6 +49,8 @@ public class StreamTransformFieldEntity implements Serializable {
 
     private Integer isMetaField;
 
+    private String metaFieldName;
+
     private String fieldFormat;
 
     private Integer rankNum;
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamFieldEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamFieldEntityMapper.xml
index a335d932b..89abb39dc 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamFieldEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamFieldEntityMapper.xml
@@ -31,25 +31,26 @@
         <result column="field_type" jdbcType="VARCHAR" property="fieldType"/>
         <result column="field_comment" jdbcType="VARCHAR" property="fieldComment"/>
         <result column="is_meta_field" jdbcType="SMALLINT" property="isMetaField"/>
+        <result column="meta_field_name" jdbcType="VARCHAR" property="metaFieldName"/>
         <result column="field_format" jdbcType="VARCHAR" property="fieldFormat"/>
         <result column="rank_num" jdbcType="INTEGER" property="rankNum"/>
         <result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
     </resultMap>
     <sql id="Base_Column_List">
-        id
-        , inlong_group_id, inlong_stream_id, is_predefined_field, field_name, field_value,
-        pre_expression, field_type, field_comment, is_meta_field, field_format, rank_num, is_deleted
+        id, inlong_group_id, inlong_stream_id, is_predefined_field, field_name, field_value, pre_expression,
+        field_type, field_comment, is_meta_field, meta_field_name, field_format, rank_num, is_deleted
     </sql>
 
     <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.InlongStreamFieldEntity">
         insert into inlong_stream_field (id, inlong_group_id, inlong_stream_id,
                                          is_predefined_field, field_name, field_value,
                                          pre_expression, field_type, field_comment,
-                                         is_meta_field, field_format, rank_num, is_deleted)
+                                         is_meta_field, meta_field_name, field_format,
+                                         rank_num, is_deleted)
         values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
                 #{isPredefinedField,jdbcType=INTEGER}, #{fieldName,jdbcType=VARCHAR}, #{fieldValue,jdbcType=VARCHAR},
                 #{preExpression,jdbcType=VARCHAR}, #{fieldType,jdbcType=VARCHAR}, #{fieldComment,jdbcType=VARCHAR},
-                #{isMetaField,jdbcType=SMALLINT}, #{fieldFormat,jdbcType=VARCHAR},
+                #{isMetaField,jdbcType=SMALLINT}, #{metaFieldName,jdbcType=VARCHAR}, #{fieldFormat,jdbcType=VARCHAR},
                 #{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER})
     </insert>
     <!-- Bulk insert, update if it exists -->
@@ -57,14 +58,15 @@
         insert into inlong_stream_field (
         id, inlong_group_id, inlong_stream_id, is_predefined_field,
         field_name, field_value, pre_expression, field_type,
-        field_comment, is_meta_field, field_format, rank_num, is_deleted
+        field_comment, is_meta_field, meta_field_name, field_format,
+        rank_num, is_deleted
         )
         values
         <foreach collection="fieldList" index="index" item="item" separator=",">
             (
             #{item.id}, #{item.inlongGroupId}, #{item.inlongStreamId}, #{item.isPredefinedField},
             #{item.fieldName}, #{item.fieldValue}, #{item.preExpression}, #{item.fieldType},
-            #{item.fieldComment}, #{item.isMetaField}, #{item.fieldFormat},
+            #{item.fieldComment}, #{item.isMetaField}, #{item.metaFieldName}, #{item.fieldFormat},
             #{item.rankNum}, #{item.isDeleted}
             )
         </foreach>
@@ -78,6 +80,7 @@
         pre_expression = values(pre_expression),
         field_type = values(field_type),
         is_meta_field = values(is_meta_field),
+        meta_field_name = values(meta_field_name),
         field_format = values(field_format),
         field_comment = values(field_comment),
         rank_num = values(rank_num),
@@ -123,4 +126,4 @@
           and is_deleted = 0
     </delete>
 
-</mapper>
\ No newline at end of file
+</mapper>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
index 73ae39791..5587a8214 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
@@ -33,13 +33,14 @@
         <result column="source_field_type" jdbcType="VARCHAR" property="sourceFieldType"/>
         <result column="ext_params" jdbcType="LONGVARCHAR" property="extParams"/>
         <result column="is_meta_field" jdbcType="SMALLINT" property="isMetaField"/>
+        <result column="meta_field_name" jdbcType="VARCHAR" property="metaFieldName"/>
         <result column="field_format" jdbcType="VARCHAR" property="fieldFormat"/>
         <result column="rank_num" jdbcType="SMALLINT" property="rankNum"/>
         <result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
     </resultMap>
     <sql id="Base_Column_List">
         id, sink_id, field_name, field_type, field_comment, source_field_name, source_field_type,
-        ext_params, is_meta_field, field_format, rank_num, is_deleted
+        ext_params, is_meta_field, meta_field_name, field_format, rank_num, is_deleted
     </sql>
 
     <insert id="insert" useGeneratedKeys="true" keyProperty="id"
@@ -49,7 +50,8 @@
                                        sink_type, field_name,
                                        field_type, field_comment,
                                        source_field_name, source_field_type,
-                                       ext_params, is_meta_field, field_format,
+                                       ext_params, is_meta_field,
+                                       meta_field_name, field_format,
                                        rank_num, is_deleted)
         values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR},
                 #{inlongStreamId,jdbcType=VARCHAR}, #{sinkId,jdbcType=INTEGER},
@@ -57,8 +59,8 @@
                 #{fieldType,jdbcType=VARCHAR}, #{fieldComment,jdbcType=VARCHAR},
                 #{sourceFieldName,jdbcType=VARCHAR}, #{sourceFieldType,jdbcType=VARCHAR},
                 #{extParams,jdbcType=LONGVARCHAR}, #{isMetaField,jdbcType=SMALLINT},
-                #{fieldFormat,jdbcType=VARCHAR}, #{rankNum,jdbcType=SMALLINT},
-                #{isDeleted,jdbcType=INTEGER})
+                #{metaFieldName,jdbcType=VARCHAR}, #{fieldFormat,jdbcType=VARCHAR},
+                #{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER})
     </insert>
     <insert id="insertAll">
         insert into stream_sink_field (
@@ -67,7 +69,8 @@
         sink_type, field_name,
         field_type, field_comment,
         source_field_name, source_field_type,
-        ext_params, is_meta_field, field_format,
+        ext_params, is_meta_field,
+        meta_field_name, field_format,
         rank_num, is_deleted
         )
         values
@@ -78,8 +81,8 @@
             #{item.sinkType,jdbcType=VARCHAR}, #{item.fieldName,jdbcType=VARCHAR},
             #{item.fieldType,jdbcType=VARCHAR}, #{item.fieldComment,jdbcType=VARCHAR},
             #{item.sourceFieldName,jdbcType=VARCHAR}, #{item.sourceFieldType,jdbcType=VARCHAR},
-            #{item.extParams,jdbcType=LONGVARCHAR},
-            #{item.isMetaField,jdbcType=SMALLINT}, #{item.fieldFormat,jdbcType=VARCHAR},
+            #{item.extParams,jdbcType=LONGVARCHAR}, #{item.isMetaField,jdbcType=SMALLINT},
+            #{item.metaFieldName,jdbcType=VARCHAR}, #{item.fieldFormat,jdbcType=VARCHAR},
             #{item.rankNum,jdbcType=SMALLINT}, #{item.isDeleted,jdbcType=INTEGER}
             )
         </foreach>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml
index 97e60414c..0bf9acd9b 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml
@@ -32,15 +32,14 @@
         <result column="field_type" jdbcType="VARCHAR" property="fieldType"/>
         <result column="field_comment" jdbcType="VARCHAR" property="fieldComment"/>
         <result column="is_meta_field" jdbcType="SMALLINT" property="isMetaField"/>
+        <result column="meta_field_name" jdbcType="VARCHAR" property="metaFieldName"/>
         <result column="field_format" jdbcType="VARCHAR" property="fieldFormat"/>
         <result column="rank_num" jdbcType="SMALLINT" property="rankNum"/>
         <result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
     </resultMap>
     <sql id="Base_Column_List">
-        id
-        , inlong_group_id, inlong_stream_id, source_id, source_type, field_name, field_value,
-    pre_expression, field_type, field_comment, is_meta_field, field_format, rank_num, 
-    is_deleted
+        id, inlong_group_id, inlong_stream_id, source_id, source_type, field_name, field_value, pre_expression,
+        field_type, field_comment, is_meta_field, meta_field_name, field_format, rank_num, is_deleted
     </sql>
     <select id="selectBySourceId" resultType="org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity">
         select
@@ -59,13 +58,13 @@
         insert into stream_source_field (id, inlong_group_id, inlong_stream_id,
                                          source_id, source_type, field_name,
                                          field_value, pre_expression, field_type,
-                                         field_comment, is_meta_field, field_format,
-                                         rank_num, is_deleted)
+                                         field_comment, is_meta_field, meta_field_name,
+                                         field_format, rank_num, is_deleted)
         values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
                 #{sourceId,jdbcType=INTEGER}, #{sourceType,jdbcType=VARCHAR}, #{fieldName,jdbcType=VARCHAR},
                 #{fieldValue,jdbcType=VARCHAR}, #{preExpression,jdbcType=VARCHAR}, #{fieldType,jdbcType=VARCHAR},
-                #{fieldComment,jdbcType=VARCHAR}, #{isMetaField,jdbcType=SMALLINT}, #{fieldFormat,jdbcType=VARCHAR},
-                #{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER})
+                #{fieldComment,jdbcType=VARCHAR}, #{isMetaField,jdbcType=SMALLINT}, #{metaFieldName,jdbcType=VARCHAR},
+                #{fieldFormat,jdbcType=VARCHAR}, #{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER})
     </insert>
     <insert id="insertSelective" useGeneratedKeys="true" keyProperty="id"
             parameterType="org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity">
@@ -104,6 +103,9 @@
             <if test="isMetaField != null">
                 is_meta_field,
             </if>
+            <if test="metaFieldName != null">
+                meta_field_name,
+            </if>
             <if test="fieldFormat != null">
                 field_format,
             </if>
@@ -148,6 +150,9 @@
             <if test="isMetaField != null">
                 #{isMetaField,jdbcType=SMALLINT},
             </if>
+            <if test="metaFieldName != null">
+                #{metaFieldName,jdbcType=VARCHAR},
+            </if>
             <if test="fieldFormat != null">
                 #{fieldFormat,jdbcType=VARCHAR},
             </if>
@@ -193,6 +198,9 @@
             <if test="isMetaField != null">
                 is_meta_field = #{isMetaField,jdbcType=SMALLINT},
             </if>
+            <if test="metaFieldName != null">
+                meta_field_name = #{metaFieldName,jdbcType=VARCHAR},
+            </if>
             <if test="fieldFormat != null">
                 field_format = #{fieldFormat,jdbcType=VARCHAR},
             </if>
@@ -217,6 +225,7 @@
             field_type       = #{fieldType,jdbcType=VARCHAR},
             field_comment    = #{fieldComment,jdbcType=VARCHAR},
             is_meta_field    = #{isMetaField,jdbcType=SMALLINT},
+            meta_field_name  = #{metaFieldName,jdbcType=VARCHAR},
             field_format     = #{fieldFormat,jdbcType=VARCHAR},
             rank_num         = #{rankNum,jdbcType=SMALLINT},
             is_deleted       = #{isDeleted,jdbcType=INTEGER}
@@ -224,21 +233,23 @@
     </update>
 
     <insert id="insertAll" parameterType="org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity">
-        insert into stream_source_field (id, inlong_group_id, inlong_stream_id,
-        source_id, source_type, field_name,
-        field_value, pre_expression, field_type,
-        field_comment, is_meta_field, field_format,
-        rank_num, is_deleted)
+        insert into stream_source_field (id, inlong_group_id,
+        inlong_stream_id, source_id,
+        source_type, field_name,
+        field_value, pre_expression,
+        field_type, field_comment,
+        is_meta_field, meta_field_name,
+        field_format, rank_num, is_deleted)
         values
         <foreach collection="list" index="index" item="item" separator=",">
             (#{item.id,jdbcType=INTEGER}, #{item.inlongGroupId,jdbcType=VARCHAR},
-            #{item.inlongStreamId,jdbcType=VARCHAR},
-            #{item.sourceId,jdbcType=INTEGER}, #{item.sourceType,jdbcType=VARCHAR}, #{item.fieldName,jdbcType=VARCHAR},
+            #{item.inlongStreamId,jdbcType=VARCHAR}, #{item.sourceId,jdbcType=INTEGER},
+            #{item.sourceType,jdbcType=VARCHAR}, #{item.fieldName,jdbcType=VARCHAR},
             #{item.fieldValue,jdbcType=VARCHAR}, #{item.preExpression,jdbcType=VARCHAR},
-            #{item.fieldType,jdbcType=VARCHAR},
-            #{item.fieldComment,jdbcType=VARCHAR}, #{item.isMetaField,jdbcType=SMALLINT},
-            #{item.fieldFormat,jdbcType=VARCHAR},
-            #{item.rankNum,jdbcType=SMALLINT}, #{item.isDeleted,jdbcType=INTEGER})
+            #{item.fieldType,jdbcType=VARCHAR}, #{item.fieldComment,jdbcType=VARCHAR},
+            #{item.isMetaField,jdbcType=SMALLINT}, #{item.metaFieldName,jdbcType=VARCHAR},
+            #{item.fieldFormat,jdbcType=VARCHAR}, #{item.rankNum,jdbcType=SMALLINT},
+            #{item.isDeleted,jdbcType=INTEGER})
         </foreach>
     </insert>
 
@@ -247,4 +258,4 @@
         from stream_source_field
         where source_id = #{sourceId,jdbcType=INTEGER}
     </delete>
-</mapper>
\ No newline at end of file
+</mapper>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformFieldEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformFieldEntityMapper.xml
index 015777dce..f35c28675 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformFieldEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformFieldEntityMapper.xml
@@ -32,17 +32,17 @@
         <result column="field_type" jdbcType="VARCHAR" property="fieldType"/>
         <result column="field_comment" jdbcType="VARCHAR" property="fieldComment"/>
         <result column="is_meta_field" jdbcType="SMALLINT" property="isMetaField"/>
+        <result column="meta_field_name" jdbcType="VARCHAR" property="metaFieldName"/>
         <result column="field_format" jdbcType="VARCHAR" property="fieldFormat"/>
-        <result column="rank_num" jdbcType="SMALLINT" property="rankNum"/>
-        <result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
         <result column="origin_node_name" jdbcType="VARCHAR" property="originNodeName"/>
         <result column="origin_field_name" jdbcType="VARCHAR" property="originFieldName"/>
+        <result column="rank_num" jdbcType="SMALLINT" property="rankNum"/>
+        <result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
     </resultMap>
     <sql id="Base_Column_List">
-        id
-        , inlong_group_id, inlong_stream_id, transform_id, transform_type, field_name,
-    field_value, pre_expression, field_type, field_comment, is_meta_field, field_format, 
-    rank_num, is_deleted, origin_node_name, origin_field_name
+        id, inlong_group_id, inlong_stream_id, transform_id, transform_type, field_name,
+        field_value, pre_expression, field_type, field_comment, is_meta_field, meta_field_name,
+        field_format, origin_node_name, origin_field_name, rank_num, is_deleted
     </sql>
     <select id="selectByTransformId" resultType="org.apache.inlong.manager.dao.entity.StreamTransformFieldEntity">
         select
@@ -77,15 +77,17 @@
         insert into stream_transform_field (id, inlong_group_id, inlong_stream_id,
                                             transform_id, transform_type, field_name,
                                             field_value, pre_expression, field_type,
-                                            field_comment, is_meta_field, field_format,
-                                            rank_num, is_deleted, origin_node_name,
-                                            origin_field_name)
+                                            field_comment, is_meta_field,
+                                            meta_field_name, field_format,
+                                            origin_node_name, origin_field_name,
+                                            rank_num, is_deleted)
         values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
                 #{transformId,jdbcType=INTEGER}, #{transformType,jdbcType=VARCHAR}, #{fieldName,jdbcType=VARCHAR},
                 #{fieldValue,jdbcType=VARCHAR}, #{preExpression,jdbcType=VARCHAR}, #{fieldType,jdbcType=VARCHAR},
-                #{fieldComment,jdbcType=VARCHAR}, #{isMetaField,jdbcType=SMALLINT}, #{fieldFormat,jdbcType=VARCHAR},
-                #{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER}, #{originNodeName,jdbcType=VARCHAR},
-                #{originFieldName,jdbcType=VARCHAR})
+                #{fieldComment,jdbcType=VARCHAR}, #{isMetaField,jdbcType=SMALLINT},
+                #{metaFieldName,jdbcType=VARCHAR}, #{fieldFormat,jdbcType=VARCHAR},
+                #{originNodeName,jdbcType=VARCHAR}, #{originFieldName,jdbcType=VARCHAR},
+                #{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER})
     </insert>
     <insert id="insertSelective" useGeneratedKeys="true" keyProperty="id"
             parameterType="org.apache.inlong.manager.dao.entity.StreamTransformFieldEntity">
@@ -124,21 +126,24 @@
             <if test="isMetaField != null">
                 is_meta_field,
             </if>
+            <if test="metaFieldName != null">
+                meta_field_name,
+            </if>
             <if test="fieldFormat != null">
                 field_format,
             </if>
-            <if test="rankNum != null">
-                rank_num,
-            </if>
-            <if test="isDeleted != null">
-                is_deleted,
-            </if>
             <if test="originNodeName != null">
                 origin_node_name,
             </if>
             <if test="originFieldName != null">
                 origin_field_name,
             </if>
+            <if test="rankNum != null">
+                rank_num,
+            </if>
+            <if test="isDeleted != null">
+                is_deleted,
+            </if>
         </trim>
         <trim prefix="values (" suffix=")" suffixOverrides=",">
             <if test="id != null">
@@ -174,21 +179,24 @@
             <if test="isMetaField != null">
                 #{isMetaField,jdbcType=SMALLINT},
             </if>
+            <if test="metaFieldName != null">
+                #{metaFieldName,jdbcType=VARCHAR},
+            </if>
             <if test="fieldFormat != null">
                 #{fieldFormat,jdbcType=VARCHAR},
             </if>
-            <if test="rankNum != null">
-                #{rankNum,jdbcType=SMALLINT},
-            </if>
-            <if test="isDeleted != null">
-                #{isDeleted,jdbcType=INTEGER},
-            </if>
             <if test="originNodeName != null">
                 #{originNodeName,jdbcType=VARCHAR},
             </if>
             <if test="originFieldName != null">
                 #{originFieldName,jdbcType=VARCHAR},
             </if>
+            <if test="rankNum != null">
+                #{rankNum,jdbcType=SMALLINT},
+            </if>
+            <if test="isDeleted != null">
+                #{isDeleted,jdbcType=INTEGER},
+            </if>
         </trim>
     </insert>
     <update id="updateByPrimaryKeySelective"
@@ -225,21 +233,24 @@
             <if test="isMetaField != null">
                 is_meta_field = #{isMetaField,jdbcType=SMALLINT},
             </if>
+            <if test="metaFieldName != null">
+                meta_field_name = #{metaFieldName,jdbcType=VARCHAR},
+            </if>
             <if test="fieldFormat != null">
                 field_format = #{fieldFormat,jdbcType=VARCHAR},
             </if>
-            <if test="rankNum != null">
-                rank_num = #{rankNum,jdbcType=SMALLINT},
-            </if>
-            <if test="isDeleted != null">
-                is_deleted = #{isDeleted,jdbcType=INTEGER},
-            </if>
             <if test="originNodeName != null">
                 origin_node_name = #{originNodeName,jdbcType=VARCHAR},
             </if>
             <if test="originFieldName != null">
                 origin_field_name = #{originFieldName,jdbcType=VARCHAR},
             </if>
+            <if test="rankNum != null">
+                rank_num = #{rankNum,jdbcType=SMALLINT},
+            </if>
+            <if test="isDeleted != null">
+                is_deleted = #{isDeleted,jdbcType=INTEGER},
+            </if>
         </set>
         where id = #{id,jdbcType=INTEGER}
     </update>
@@ -255,33 +266,36 @@
             field_type        = #{fieldType,jdbcType=VARCHAR},
             field_comment     = #{fieldComment,jdbcType=VARCHAR},
             is_meta_field     = #{isMetaField,jdbcType=SMALLINT},
+            meta_field_name   = #{metaFieldName,jdbcType=VARCHAR},
             field_format      = #{fieldFormat,jdbcType=VARCHAR},
-            rank_num          = #{rankNum,jdbcType=SMALLINT},
-            is_deleted        = #{isDeleted,jdbcType=INTEGER},
             origin_node_name  = #{originNodeName,jdbcType=VARCHAR},
-            origin_field_name = #{originFieldName,jdbcType=VARCHAR}
+            origin_field_name = #{originFieldName,jdbcType=VARCHAR},
+            rank_num          = #{rankNum,jdbcType=SMALLINT},
+            is_deleted        = #{isDeleted,jdbcType=INTEGER}
         where id = #{id,jdbcType=INTEGER}
     </update>
 
     <insert id="insertAll" parameterType="org.apache.inlong.manager.dao.entity.StreamTransformFieldEntity">
-        insert into stream_transform_field (id, inlong_group_id, inlong_stream_id,
-        transform_id, transform_type, field_name,
-        field_value, pre_expression, field_type,
-        field_comment, is_meta_field, field_format,
-        rank_num, is_deleted, origin_node_name,
-        origin_field_name)
+        insert into stream_transform_field (id, inlong_group_id,
+        inlong_stream_id, transform_id,
+        transform_type, field_name,
+        field_value, pre_expression,
+        field_type, field_comment,
+        is_meta_field, meta_field_name,
+        field_format, origin_node_name,
+        origin_field_name,
+        rank_num, is_deleted)
         values
         <foreach collection="list" index="index" item="item" separator=",">
             (#{item.id,jdbcType=INTEGER}, #{item.inlongGroupId,jdbcType=VARCHAR},
-            #{item.inlongStreamId,jdbcType=VARCHAR},
-            #{item.transformId,jdbcType=INTEGER}, #{item.transformType,jdbcType=VARCHAR},
-            #{item.fieldName,jdbcType=VARCHAR},
+            #{item.inlongStreamId,jdbcType=VARCHAR}, #{item.transformId,jdbcType=INTEGER},
+            #{item.transformType,jdbcType=VARCHAR}, #{item.fieldName,jdbcType=VARCHAR},
             #{item.fieldValue,jdbcType=VARCHAR}, #{item.preExpression,jdbcType=VARCHAR},
-            #{item.fieldType,jdbcType=VARCHAR},
-            #{item.fieldComment,jdbcType=VARCHAR}, #{item.isMetaField,jdbcType=SMALLINT},
-            #{item.fieldFormat,jdbcType=VARCHAR},
-            #{item.rankNum,jdbcType=SMALLINT}, #{item.isDeleted,jdbcType=INTEGER},
-            #{item.originNodeName,jdbcType=VARCHAR}, #{item.originFieldName,jdbcType=VARCHAR})
+            #{item.fieldType,jdbcType=VARCHAR}, #{item.fieldComment,jdbcType=VARCHAR},
+            #{item.isMetaField,jdbcType=SMALLINT}, #{item.metaFieldName,jdbcType=VARCHAR},
+            #{item.fieldFormat,jdbcType=VARCHAR}, #{item.originNodeName,jdbcType=VARCHAR},
+            #{item.originFieldName,jdbcType=VARCHAR},
+            #{item.rankNum,jdbcType=SMALLINT}, #{item.isDeleted,jdbcType=INTEGER})
         </foreach>
     </insert>
 
@@ -290,4 +304,4 @@
         from stream_transform_field
         where transform_id = #{transformId,jdbcType=INTEGER}
     </delete>
-</mapper>
\ No newline at end of file
+</mapper>
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldInfoUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldInfoUtils.java
index 3f28cadb8..e9859592a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldInfoUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldInfoUtils.java
@@ -20,7 +20,6 @@ package org.apache.inlong.manager.service.sort.util;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.FieldType;
-import org.apache.inlong.manager.common.enums.MetaFieldType;
 import org.apache.inlong.manager.common.pojo.sink.SinkField;
 import org.apache.inlong.manager.common.pojo.stream.StreamField;
 import org.apache.inlong.sort.formats.common.ArrayFormatInfo;
@@ -38,15 +37,13 @@ import org.apache.inlong.sort.formats.common.ShortFormatInfo;
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.apache.inlong.sort.formats.common.TimeFormatInfo;
 import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField;
 import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.MetaFieldInfo;
+import org.apache.inlong.sort.protocol.MetaFieldInfo.MetaField;
 import org.apache.inlong.sort.protocol.transformation.FieldMappingRule.FieldMappingUnit;
 
 import java.util.ArrayList;
-import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
 
 /**
  * Util for sort field info.
@@ -54,49 +51,27 @@ import java.util.Map;
 @Slf4j
 public class FieldInfoUtils {
 
-    /**
-     * Built in field map, key is field name, value is built in field name
-     */
-    public static final Map<String, BuiltInField> BUILT_IN_FIELD_MAP = new LinkedHashMap<>();
-
-    static {
-        BUILT_IN_FIELD_MAP.put(MetaFieldType.DATA_TIME.getName(), BuiltInField.DATA_TIME);
-        BUILT_IN_FIELD_MAP.put(MetaFieldType.DATABASE.getName(), BuiltInField.MYSQL_METADATA_DATABASE);
-        BUILT_IN_FIELD_MAP.put(MetaFieldType.TABLE.getName(), BuiltInField.MYSQL_METADATA_TABLE);
-        BUILT_IN_FIELD_MAP.put(MetaFieldType.EVENT_TIME.getName(), BuiltInField.MYSQL_METADATA_EVENT_TIME);
-        BUILT_IN_FIELD_MAP.put(MetaFieldType.IS_DDL.getName(), BuiltInField.MYSQL_METADATA_IS_DDL);
-        BUILT_IN_FIELD_MAP.put(MetaFieldType.EVENT_TYPE.getName(), BuiltInField.MYSQL_METADATA_EVENT_TYPE);
-        BUILT_IN_FIELD_MAP.put(MetaFieldType.PROCESSING_TIME.getName(), BuiltInField.PROCESS_TIME);
-        BUILT_IN_FIELD_MAP.put(MetaFieldType.UPDATE_BEFORE.getName(), BuiltInField.METADATA_UPDATE_BEFORE);
-        BUILT_IN_FIELD_MAP.put(MetaFieldType.BATCH_ID.getName(), BuiltInField.METADATA_BATCH_ID);
-        BUILT_IN_FIELD_MAP.put(MetaFieldType.SQL_TYPE.getName(), BuiltInField.METADATA_SQL_TYPE);
-        BUILT_IN_FIELD_MAP.put(MetaFieldType.TS.getName(), BuiltInField.METADATA_TS);
-        BUILT_IN_FIELD_MAP.put(MetaFieldType.MYSQL_TYPE.getName(), BuiltInField.METADATA_MYSQL_TYPE);
-        BUILT_IN_FIELD_MAP.put(MetaFieldType.PK_NAMES.getName(), BuiltInField.METADATA_PK_NAMES);
-        BUILT_IN_FIELD_MAP.put(MetaFieldType.MYSQL_DATA.getName(), BuiltInField.MYSQL_METADATA_DATA);
-    }
-
     public static FieldInfo parseSinkFieldInfo(SinkField sinkField, String nodeId) {
-        boolean isBuiltIn = sinkField.getIsMetaField() == 1;
+        boolean isMetaField = sinkField.getIsMetaField() == 1;
         FieldInfo fieldInfo = getFieldInfo(sinkField.getFieldName(),
-                sinkField.getFieldType(),
-                isBuiltIn, sinkField.getFieldFormat());
+                sinkField.getFieldType(), isMetaField, sinkField.getMetaFieldName(),
+                sinkField.getFieldFormat());
         fieldInfo.setNodeId(nodeId);
         return fieldInfo;
     }
 
     public static FieldInfo parseStreamFieldInfo(StreamField streamField, String nodeId) {
-        boolean isBuiltIn = streamField.getIsMetaField() == 1;
-        FieldInfo fieldInfo = getFieldInfo(streamField.getFieldName(), streamField.getFieldType(), isBuiltIn,
-                streamField.getFieldFormat());
+        boolean isMetaField = streamField.getIsMetaField() == 1;
+        FieldInfo fieldInfo = getFieldInfo(streamField.getFieldName(), streamField.getFieldType(),
+                isMetaField, streamField.getMetaFieldName(), streamField.getFieldFormat());
         fieldInfo.setNodeId(nodeId);
         return fieldInfo;
     }
 
     public static FieldInfo parseStreamField(StreamField streamField) {
-        boolean isBuiltIn = streamField.getIsMetaField() == 1;
-        FieldInfo fieldInfo = getFieldInfo(streamField.getFieldName(), streamField.getFieldType(), isBuiltIn,
-                streamField.getFieldFormat());
+        boolean isMetaField = streamField.getIsMetaField() == 1;
+        FieldInfo fieldInfo = getFieldInfo(streamField.getFieldName(), streamField.getFieldType(),
+                isMetaField, streamField.getMetaFieldName(), streamField.getFieldFormat());
         fieldInfo.setNodeId(streamField.getOriginNodeName());
         return fieldInfo;
     }
@@ -112,7 +87,7 @@ public class FieldInfoUtils {
         // Set source field info list.
         for (StreamField field : streamFieldList) {
             FieldInfo sourceField = getFieldInfo(field.getFieldName(), field.getFieldType(),
-                    field.getIsMetaField() == 1, field.getFieldFormat());
+                    field.getIsMetaField() == 1, field.getMetaFieldName(), field.getFieldFormat());
             sourceFields.add(sourceField);
         }
 
@@ -120,11 +95,12 @@ public class FieldInfoUtils {
         // Get sink field info list, if the field name equals to build-in field, new a build-in field info
         for (SinkField field : fieldList) {
             FieldInfo sinkField = getFieldInfo(field.getFieldName(), field.getFieldType(),
-                    field.getIsMetaField() == 1, field.getFieldFormat());
+                    field.getIsMetaField() == 1, field.getMetaFieldName(), field.getFieldFormat());
             sinkFields.add(sinkField);
             if (StringUtils.isNotBlank(field.getSourceFieldName())) {
                 FieldInfo sourceField = getFieldInfo(field.getSourceFieldName(),
-                        field.getSourceFieldType(), field.getIsMetaField() == 1, field.getFieldFormat());
+                        field.getSourceFieldType(), field.getIsMetaField() == 1,
+                        field.getMetaFieldName(), field.getFieldFormat());
                 mappingUnitList.add(new FieldMappingUnit(sourceField, sinkField));
             }
         }
@@ -137,27 +113,13 @@ public class FieldInfoUtils {
      *
      * @apiNote If the field name equals to build-in field, new a build-in field info
      */
-    private static FieldInfo getFieldInfo(String fieldName, String fieldType, boolean isBuiltin, String format) {
-        BuiltInField builtInField = BUILT_IN_FIELD_MAP.get(fieldName);
-        FormatInfo formatInfo = convertFieldFormat(fieldType.toLowerCase(), format);
-        if (isBuiltin && builtInField != null) {
-            return new BuiltInFieldInfo(fieldName, formatInfo, builtInField);
+    private static FieldInfo getFieldInfo(String fieldName, String fieldType,
+            boolean isMetaField, String metaFieldName, String format) {
+        if (isMetaField) {
+            // TODO The meta field needs to be selectable and cannot be filled in by the user
+            return new MetaFieldInfo(fieldName, MetaField.forName(metaFieldName));
         } else {
-            if (isBuiltin) {
-                // Check if fieldName contains buildInFieldName, such as left_database
-                // TODO The buildin field needs to be selectable and cannot be filled in by the user
-                for (String buildInFieldName : BUILT_IN_FIELD_MAP.keySet()) {
-                    if (fieldName.contains(buildInFieldName)) {
-                        builtInField = BUILT_IN_FIELD_MAP.get(buildInFieldName);
-                        break;
-                    }
-                }
-                if (builtInField != null) {
-                    return new BuiltInFieldInfo(fieldName, formatInfo, builtInField);
-                }
-                log.warn("Unsupported metadata fieldName={} as the builtInField is null", fieldName);
-            }
-            return new FieldInfo(fieldName, formatInfo);
+            return new FieldInfo(fieldName, convertFieldFormat(fieldType.toLowerCase(), format));
         }
     }
 
@@ -167,18 +129,13 @@ public class FieldInfoUtils {
     public static List<FieldMappingUnit> setAllMigrationFieldMapping(List<FieldInfo> sourceFields,
             List<FieldInfo> sinkFields) {
         List<FieldMappingUnit> mappingUnitList = new ArrayList<>();
-        BuiltInFieldInfo dataField = new BuiltInFieldInfo("data", StringFormatInfo.INSTANCE,
-                BuiltInField.MYSQL_METADATA_DATA);
+        MetaFieldInfo dataField = new MetaFieldInfo("data", MetaField.DATA);
         sourceFields.add(dataField);
         sinkFields.add(dataField);
         mappingUnitList.add(new FieldMappingUnit(dataField, dataField));
-
-        for (Map.Entry<String, BuiltInField> entry : BUILT_IN_FIELD_MAP.entrySet()) {
-            if (entry.getKey().equals("data_time")) {
-                continue;
-            }
-            BuiltInFieldInfo fieldInfo = new BuiltInFieldInfo(entry.getKey(),
-                    StringFormatInfo.INSTANCE, entry.getValue());
+        // TODO discarded later
+        for (MetaField metaField : MetaField.values()) {
+            MetaFieldInfo fieldInfo = new MetaFieldInfo(metaField.name(), metaField);
             sourceFields.add(fieldInfo);
             sinkFields.add(fieldInfo);
             mappingUnitList.add(new FieldMappingUnit(fieldInfo, fieldInfo));
diff --git a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
index 13ad0d454..e20437a39 100644
--- a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
@@ -379,6 +379,7 @@ CREATE TABLE IF NOT EXISTS `inlong_stream_field`
     `field_type`          varchar(20)  NOT NULL COMMENT 'field type',
     `field_comment`       varchar(50)  DEFAULT NULL COMMENT 'Field description',
     `is_meta_field`       smallint(3)  DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
+    `meta_field_name`     varchar(20)  DEFAULT NULL COMMENT 'Meta field name',
     `field_format`        varchar(50)  DEFAULT NULL COMMENT 'Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such as yyyy-MM-dd HH:mm:ss',
     `rank_num`            smallint(6)  DEFAULT '0' COMMENT 'Field order (front-end display field order)',
     `is_deleted`          int(11)      DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
@@ -595,6 +596,7 @@ CREATE TABLE IF NOT EXISTS `stream_source_field`
     `field_type`       varchar(20)  NOT NULL COMMENT 'field type',
     `field_comment`    varchar(50)  DEFAULT NULL COMMENT 'Field description',
     `is_meta_field`    smallint(3)  DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
+    `meta_field_name`  varchar(20)  DEFAULT NULL COMMENT 'Meta field name',
     `field_format`     varchar(50)  DEFAULT NULL COMMENT 'Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such as yyyy-MM-dd HH:mm:ss',
     `rank_num`         smallint(6)  DEFAULT '0' COMMENT 'Field order (front-end display field order)',
     `is_deleted`       int(11)      DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
@@ -619,6 +621,7 @@ CREATE TABLE IF NOT EXISTS `stream_transform_field`
     `field_type`        varchar(50)  NOT NULL COMMENT 'Field type',
     `field_comment`     varchar(2000) DEFAULT NULL COMMENT 'Field description',
     `is_meta_field`     smallint(3)   DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
+    `meta_field_name`   varchar(20)   DEFAULT NULL COMMENT 'Meta field name',
     `field_format`      varchar(50)   DEFAULT NULL COMMENT 'Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such as yyyy-MM-dd HH:mm:ss',
     `rank_num`          smallint(6)   DEFAULT '0' COMMENT 'Field order (front-end display field order)',
     `is_deleted`        int(11)       DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
@@ -646,6 +649,7 @@ CREATE TABLE IF NOT EXISTS `stream_sink_field`
     `field_comment`     varchar(2000) DEFAULT NULL COMMENT 'Field description',
     `ext_params`        text COMMENT 'Field ext params',
     `is_meta_field`     smallint(3)   DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
+    `meta_field_name`   varchar(20)   DEFAULT NULL COMMENT 'Meta field name',
     `field_format`      varchar(50)   DEFAULT NULL COMMENT 'Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such as yyyy-MM-dd HH:mm:ss',
     `rank_num`          smallint(6)   DEFAULT '0' COMMENT 'Field order (front-end display field order)',
     `is_deleted`        int(11)       DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index b0e06ec02..fe68bba78 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -405,6 +405,7 @@ CREATE TABLE IF NOT EXISTS `inlong_stream_field`
     `field_type`          varchar(20)  NOT NULL COMMENT 'field type',
     `field_comment`       varchar(50)  DEFAULT NULL COMMENT 'Field description',
     `is_meta_field`       smallint(3)  DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
+    `meta_field_name`     varchar(20)  DEFAULT NULL COMMENT 'Meta field name',
     `field_format`        varchar(50)  DEFAULT NULL COMMENT 'Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such as yyyy-MM-dd HH:mm:ss',
     `rank_num`            smallint(6)  DEFAULT '0' COMMENT 'Field order (front-end display field order)',
     `is_deleted`          int(11)      DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
@@ -630,6 +631,7 @@ CREATE TABLE IF NOT EXISTS `stream_source_field`
     `field_type`       varchar(20)  NOT NULL COMMENT 'field type',
     `field_comment`    varchar(50)  DEFAULT NULL COMMENT 'Field description',
     `is_meta_field`    smallint(3)  DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
+    `meta_field_name`  varchar(20)  DEFAULT NULL COMMENT 'Meta field name',
     `field_format`     varchar(50)  DEFAULT NULL COMMENT 'Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such as yyyy-MM-dd HH:mm:ss',
     `rank_num`         smallint(6)  DEFAULT '0' COMMENT 'Field order (front-end display field order)',
     `is_deleted`       int(11)      DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
@@ -654,6 +656,7 @@ CREATE TABLE IF NOT EXISTS `stream_transform_field`
     `field_type`        varchar(50)  NOT NULL COMMENT 'Field type',
     `field_comment`     varchar(2000) DEFAULT NULL COMMENT 'Field description',
     `is_meta_field`     smallint(3)   DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
+    `meta_field_name`   varchar(20)   DEFAULT NULL COMMENT 'Meta field name',
     `field_format`      varchar(50)   DEFAULT NULL COMMENT 'Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such as yyyy-MM-dd HH:mm:ss',
     `rank_num`          smallint(6)   DEFAULT '0' COMMENT 'Field order (front-end display field order)',
     `is_deleted`        int(11)       DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
@@ -682,6 +685,7 @@ CREATE TABLE IF NOT EXISTS `stream_sink_field`
     `field_comment`     varchar(2000) DEFAULT NULL COMMENT 'Field description',
     `ext_params`        text COMMENT 'Field ext params',
     `is_meta_field`     smallint(3)   DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
+    `meta_field_name`   varchar(20)   DEFAULT NULL COMMENT 'Meta field name',
     `field_format`      varchar(50)   DEFAULT NULL COMMENT 'Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such as yyyy-MM-dd HH:mm:ss',
     `rank_num`          smallint(6)   DEFAULT '0' COMMENT 'Field order (front-end display field order)',
     `is_deleted`        int(11)       DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/BuiltInFieldInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/BuiltInFieldInfo.java
deleted file mode 100644
index d1506ad54..000000000
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/BuiltInFieldInfo.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.protocol;
-
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.inlong.sort.formats.common.FormatInfo;
-
-/**
- * built-in field info.
- */
-@Deprecated
-public class BuiltInFieldInfo extends FieldInfo {
-
-    private static final long serialVersionUID = -3436204467879205139L;
-
-    @JsonProperty("builtinField")
-    private final BuiltInField builtInField;
-
-    @JsonCreator
-    public BuiltInFieldInfo(
-            @JsonProperty("name") String name,
-            @JsonProperty("nodeId") String nodeId,
-            @JsonProperty("formatInfo") FormatInfo formatInfo,
-            @JsonProperty("builtinField") BuiltInField builtInField) {
-        super(name, nodeId, formatInfo);
-        this.builtInField = builtInField;
-    }
-
-    public BuiltInFieldInfo(
-            @JsonProperty("name") String name,
-            @JsonProperty("formatInfo") FormatInfo formatInfo,
-            @JsonProperty("builtinField") BuiltInField builtInField) {
-        super(name, formatInfo);
-        this.builtInField = builtInField;
-    }
-
-    @JsonProperty("builtinField")
-    public BuiltInField getBuiltInField() {
-        return builtInField;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        if (!super.equals(o)) {
-            return false;
-        }
-        BuiltInFieldInfo that = (BuiltInFieldInfo) o;
-        return builtInField == that.builtInField
-                && super.equals(that);
-    }
-
-    public enum BuiltInField {
-        /**
-         * The event time of flink
-         */
-        @Deprecated
-        DATA_TIME,
-        /**
-         * The process time of flink
-         */
-        PROCESS_TIME,
-        /**
-         * The name of the database containing this Row
-         * It is deprecated and can be replaced by ${@link BuiltInField#DATABASE_NAME}
-         * and will be removed in a future version.
-         */
-        @Deprecated
-        MYSQL_METADATA_DATABASE,
-        /**
-         * The name of the table containing this Row
-         * It is deprecated and can be replaced by ${@link BuiltInField#TABLE_NAME}
-         * and will be removed in a future version.
-         */
-        @Deprecated
-        MYSQL_METADATA_TABLE,
-        /**
-         * The time when the Row made changes in the database.
-         * It is deprecated and can be replaced by ${@link BuiltInField#OP_TS}
-         * and will be removed in a future version.
-         */
-        @Deprecated
-        MYSQL_METADATA_EVENT_TIME,
-        /**
-         * Name of the schema that contain the row.
-         */
-        SCHEMA_NAME,
-        /**
-         * Name of the database that contain the row.
-         */
-        DATABASE_NAME,
-        /**
-         * Name of the table that contain the row.
-         */
-        TABLE_NAME,
-        /**
-         * It indicates the time that the change was made in the database.
-         * If the record is read from snapshot of the table instead of the change stream, the value is always 0
-         */
-        OP_TS,
-        /**
-         * Whether the DDL statement
-         */
-        IS_DDL,
-        /**
-         * Whether the DDL statement
-         * It is deprecated and can be replaced by ${@link BuiltInField#IS_DDL}
-         * and will be removed in a future version.
-         */
-        @Deprecated
-        MYSQL_METADATA_IS_DDL,
-        /**
-         * Type of database operation, such as INSERT/DELETE, etc.
-         */
-        OP_TYPE,
-        /**
-         * Type of database operation, such as INSERT/DELETE, etc.
-         * It is deprecated and can be replaced by ${@link BuiltInField#OP_TYPE}
-         * and will be removed in a future version.
-         */
-        @Deprecated
-        MYSQL_METADATA_EVENT_TYPE,
-        /**
-         * MySQL binlog data Row
-         */
-        MYSQL_METADATA_DATA,
-        /**
-         * The value of the field before update
-         */
-        METADATA_UPDATE_BEFORE,
-        /**
-         * Batch id of binlog
-         */
-        METADATA_BATCH_ID,
-        /**
-         * Mapping of sql_type table fields to java data type IDs
-         */
-        METADATA_SQL_TYPE,
-        /**
-         * The current time when the ROW was received and processed
-         */
-        METADATA_TS,
-        /**
-         * The table structure
-         */
-        METADATA_MYSQL_TYPE,
-        /**
-         * Primary key field name
-         */
-        METADATA_PK_NAMES
-    }
-}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/FieldInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/FieldInfo.java
index 453a049b0..3ff8a2659 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/FieldInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/FieldInfo.java
@@ -38,9 +38,8 @@ import java.io.Serializable;
         include = JsonTypeInfo.As.PROPERTY,
         property = "type")
 @JsonSubTypes({
-        @JsonSubTypes.Type(value = FieldInfo.class, name = "base"),
-        @JsonSubTypes.Type(value = MetaFieldInfo.class, name = "metaField"),
-        @JsonSubTypes.Type(value = BuiltInFieldInfo.class, name = "builtin")
+        @JsonSubTypes.Type(value = FieldInfo.class, name = "field"),
+        @JsonSubTypes.Type(value = MetaFieldInfo.class, name = "metaField")
 })
 @Data
 public class FieldInfo implements FunctionParam, Serializable {
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/MetaFieldInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/MetaFieldInfo.java
index bd3d389d6..4bfac5512 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/MetaFieldInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/MetaFieldInfo.java
@@ -122,6 +122,15 @@ public class MetaFieldInfo extends FieldInfo {
         /**
          * Primary key field name. Currently, it is used for MySQL database.
          */
-        PK_NAMES
+        PK_NAMES;
+
+        public static MetaField forName(String name) {
+            for (MetaField metaField : values()) {
+                if (metaField.name().equals(name)) {
+                    return metaField;
+                }
+            }
+            throw new UnsupportedOperationException(String.format("Unsupported MetaField=%s for Inlong", name));
+        }
     }
 }
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java
index b1ef83e54..f72a9d128 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java
@@ -20,7 +20,6 @@ package org.apache.inlong.sort.protocol.transformation;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.MetaFieldInfo;
 import org.apache.inlong.sort.protocol.transformation.function.CascadeFunctionWrapper;
@@ -62,8 +61,7 @@ import org.apache.inlong.sort.protocol.transformation.operator.OrOperator;
         include = JsonTypeInfo.As.PROPERTY,
         property = "type")
 @JsonSubTypes({
-        @JsonSubTypes.Type(value = FieldInfo.class, name = "base"),
-        @JsonSubTypes.Type(value = BuiltInFieldInfo.class, name = "builtin"),
+        @JsonSubTypes.Type(value = FieldInfo.class, name = "field"),
         @JsonSubTypes.Type(value = MetaFieldInfo.class, name = "metaField"),
         @JsonSubTypes.Type(value = ConstantParam.class, name = "constant"),
         @JsonSubTypes.Type(value = TimeUnitConstantParam.class, name = "timeUnitConstant"),
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/BuiltInFieldInfoTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/BuiltInFieldInfoTest.java
deleted file mode 100644
index 423a64de4..000000000
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/BuiltInFieldInfoTest.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.protocol;
-
-import org.apache.inlong.sort.SerializeBaseTest;
-import org.apache.inlong.sort.formats.common.StringFormatInfo;
-
-/**
- * Test for {@link BuiltInFieldInfo}
- */
-public class BuiltInFieldInfoTest extends SerializeBaseTest<BuiltInFieldInfo> {
-
-    @Override
-    public BuiltInFieldInfo getTestObject() {
-        return new BuiltInFieldInfo("f1", StringFormatInfo.INSTANCE, BuiltInFieldInfo.BuiltInField.DATA_TIME);
-    }
-}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/FieldInfoTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/FieldInfoTest.java
index e2ed364ec..1e2553eb4 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/FieldInfoTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/FieldInfoTest.java
@@ -43,7 +43,7 @@ public class FieldInfoTest extends SerializeBaseTest<FieldInfo> {
         FieldInfo fieldInfo = new FieldInfo("field_name", StringFormatInfo.INSTANCE);
         fieldInfo.setNodeId("1");
         ObjectMapper objectMapper = new ObjectMapper();
-        String fieldInfoStr = "{\"type\":\"base\",\"name\":\"field_name\","
+        String fieldInfoStr = "{\"type\":\"field\",\"name\":\"field_name\","
                 + "\"formatInfo\":{\"type\":\"string\"},\"nodeId\":\"1\"}";
         FieldInfo expected = objectMapper.readValue(fieldInfoStr, FieldInfo.class);
         assertEquals(expected, fieldInfo);
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
index 8ef086c6e..7046bcc24 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
@@ -25,8 +25,6 @@ import org.apache.inlong.sort.function.RegexpReplaceFirstFunction;
 import org.apache.inlong.sort.parser.Parser;
 import org.apache.inlong.sort.parser.result.FlinkSqlParseResult;
 import org.apache.inlong.sort.parser.result.ParseResult;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.GroupInfo;
 import org.apache.inlong.sort.protocol.MetaFieldInfo;
@@ -670,9 +668,6 @@ public class FlinkSqlParser implements Parser {
             if (field instanceof MetaFieldInfo) {
                 MetaFieldInfo metaFieldInfo = (MetaFieldInfo) field;
                 parseMetaField(node, metaFieldInfo, sb);
-            } else if (field instanceof BuiltInFieldInfo) {
-                BuiltInFieldInfo builtInFieldInfo = (BuiltInFieldInfo) field;
-                parseMetaField(node, builtInFieldInfo, sb);
             } else {
                 sb.append(TableFormatUtils.deriveLogicalType(field.getFormatInfo()).asSummaryString());
             }
@@ -684,27 +679,6 @@ public class FlinkSqlParser implements Parser {
         return sb.toString();
     }
 
-    @Deprecated
-    private void parseMetaField(Node node, BuiltInFieldInfo metaField, StringBuilder sb) {
-        if (metaField.getBuiltInField() == BuiltInField.PROCESS_TIME) {
-            sb.append(" AS PROCTIME()");
-            return;
-        }
-        if (node instanceof MySqlExtractNode) {
-            sb.append(parseMySqlExtractNodeMetaField(metaField));
-        } else if (node instanceof OracleExtractNode) {
-            sb.append(parseOracleExtractNodeMetaField(metaField));
-        } else if (node instanceof KafkaExtractNode) {
-            sb.append(parseKafkaExtractNodeMetaField(metaField));
-        } else if (node instanceof KafkaLoadNode) {
-            sb.append(parseKafkaLoadNodeMetaField(metaField));
-        } else {
-            throw new UnsupportedOperationException(
-                    String.format("This node:%s does not currently support metadata fields",
-                            node.getClass().getName()));
-        }
-    }
-
     private void parseMetaField(Node node, MetaFieldInfo metaFieldInfo, StringBuilder sb) {
         if (metaFieldInfo.getMetaField() == MetaFieldInfo.MetaField.PROCESS_TIME) {
             sb.append(" AS PROCTIME()");
@@ -725,58 +699,6 @@ public class FlinkSqlParser implements Parser {
         }
     }
 
-    @Deprecated
-    private String parseKafkaLoadNodeMetaField(BuiltInFieldInfo metaField) {
-        String metaType;
-        switch (metaField.getBuiltInField()) {
-            case MYSQL_METADATA_TABLE:
-            case TABLE_NAME:
-                metaType = "STRING METADATA FROM 'value.table'";
-                break;
-            case MYSQL_METADATA_DATABASE:
-            case DATABASE_NAME:
-                metaType = "STRING METADATA FROM 'value.database'";
-                break;
-            case MYSQL_METADATA_EVENT_TIME:
-            case OP_TS:
-                metaType = "TIMESTAMP(3) METADATA FROM 'value.event-timestamp'";
-                break;
-            case MYSQL_METADATA_EVENT_TYPE:
-            case OP_TYPE:
-                metaType = "STRING METADATA FROM 'value.op-type'";
-                break;
-            case MYSQL_METADATA_DATA:
-                metaType = "STRING METADATA FROM 'value.data'";
-                break;
-            case MYSQL_METADATA_IS_DDL:
-            case IS_DDL:
-                metaType = "BOOLEAN METADATA FROM 'value.is-ddl'";
-                break;
-            case METADATA_TS:
-                metaType = "TIMESTAMP_LTZ(3) METADATA FROM 'value.ingestion-timestamp'";
-                break;
-            case METADATA_SQL_TYPE:
-                metaType = "MAP<STRING, INT> METADATA FROM 'value.sql-type'";
-                break;
-            case METADATA_MYSQL_TYPE:
-                metaType = "MAP<STRING, STRING> METADATA FROM 'value.mysql-type'";
-                break;
-            case METADATA_PK_NAMES:
-                metaType = "ARRAY<STRING> METADATA FROM 'value.pk-names'";
-                break;
-            case METADATA_BATCH_ID:
-                metaType = "BIGINT METADATA FROM 'value.batch-id'";
-                break;
-            case METADATA_UPDATE_BEFORE:
-                metaType = "ARRAY<MAP<STRING, STRING>> METADATA FROM 'value.update-before'";
-                break;
-            default:
-                throw new UnsupportedOperationException(String.format("Unsupport meta field: %s",
-                        metaField.getBuiltInField()));
-        }
-        return metaType;
-    }
-
     private String parseKafkaLoadNodeMetaField(MetaFieldInfo metaFieldInfo) {
         String metaType;
         switch (metaFieldInfo.getMetaField()) {
@@ -823,56 +745,6 @@ public class FlinkSqlParser implements Parser {
         return metaType;
     }
 
-    @Deprecated
-    private String parseKafkaExtractNodeMetaField(BuiltInFieldInfo metaField) {
-        String metaType;
-        switch (metaField.getBuiltInField()) {
-            case MYSQL_METADATA_TABLE:
-            case TABLE_NAME:
-                metaType = "STRING METADATA FROM 'value.table'";
-                break;
-            case MYSQL_METADATA_DATABASE:
-            case DATABASE_NAME:
-                metaType = "STRING METADATA FROM 'value.database'";
-                break;
-            case METADATA_SQL_TYPE:
-                metaType = "MAP<STRING, INT> METADATA FROM 'value.sql-type'";
-                break;
-            case METADATA_PK_NAMES:
-                metaType = "ARRAY<STRING> METADATA FROM 'value.pk-names'";
-                break;
-            case METADATA_TS:
-                metaType = "TIMESTAMP_LTZ(3) METADATA FROM 'value.ingestion-timestamp'";
-                break;
-            case MYSQL_METADATA_EVENT_TIME:
-            case OP_TS:
-                metaType = "TIMESTAMP_LTZ(3) METADATA FROM 'value.event-timestamp'";
-                break;
-            // additional metadata
-            case MYSQL_METADATA_EVENT_TYPE:
-            case OP_TYPE:
-                metaType = "STRING METADATA FROM 'value.op-type'";
-                break;
-            case MYSQL_METADATA_IS_DDL:
-            case IS_DDL:
-                metaType = "BOOLEAN METADATA FROM 'value.is-ddl'";
-                break;
-            case METADATA_MYSQL_TYPE:
-                metaType = "MAP<STRING, STRING> METADATA FROM 'value.mysql-type'";
-                break;
-            case METADATA_BATCH_ID:
-                metaType = "BIGINT METADATA FROM 'value.batch-id'";
-                break;
-            case METADATA_UPDATE_BEFORE:
-                metaType = "ARRAY<MAP<STRING, STRING>> METADATA FROM 'value.update-before'";
-                break;
-            default:
-                throw new UnsupportedOperationException(String.format("Unsupport meta field: %s",
-                        metaField.getBuiltInField()));
-        }
-        return metaType;
-    }
-
     private String parseKafkaExtractNodeMetaField(MetaFieldInfo metaFieldInfo) {
         String metaType;
         switch (metaFieldInfo.getMetaField()) {
@@ -963,80 +835,6 @@ public class FlinkSqlParser implements Parser {
         return metaType;
     }
 
-    private String parseMySqlExtractNodeMetaField(BuiltInFieldInfo metaField) {
-        String metaType;
-        switch (metaField.getBuiltInField()) {
-            case MYSQL_METADATA_TABLE:
-            case TABLE_NAME:
-                metaType = "STRING METADATA FROM 'meta.table_name' VIRTUAL";
-                break;
-            case MYSQL_METADATA_DATABASE:
-            case DATABASE_NAME:
-                metaType = "STRING METADATA FROM 'meta.database_name' VIRTUAL";
-                break;
-            case MYSQL_METADATA_EVENT_TIME:
-            case OP_TS:
-                metaType = "TIMESTAMP(3) METADATA FROM 'meta.op_ts' VIRTUAL";
-                break;
-            case MYSQL_METADATA_EVENT_TYPE:
-            case OP_TYPE:
-                metaType = "STRING METADATA FROM 'meta.op_type' VIRTUAL";
-                break;
-            case MYSQL_METADATA_DATA:
-                metaType = "STRING METADATA FROM 'meta.data' VIRTUAL";
-                break;
-            case MYSQL_METADATA_IS_DDL:
-            case IS_DDL:
-                metaType = "BOOLEAN METADATA FROM 'meta.is_ddl' VIRTUAL";
-                break;
-            case METADATA_TS:
-                metaType = "TIMESTAMP_LTZ(3) METADATA FROM 'meta.ts' VIRTUAL";
-                break;
-            case METADATA_SQL_TYPE:
-                metaType = "MAP<STRING, INT> METADATA FROM 'meta.sql_type' VIRTUAL";
-                break;
-            case METADATA_MYSQL_TYPE:
-                metaType = "MAP<STRING, STRING> METADATA FROM 'meta.mysql_type' VIRTUAL";
-                break;
-            case METADATA_PK_NAMES:
-                metaType = "ARRAY<STRING> METADATA FROM 'meta.pk_names' VIRTUAL";
-                break;
-            case METADATA_BATCH_ID:
-                metaType = "BIGINT METADATA FROM 'meta.batch_id' VIRTUAL";
-                break;
-            case METADATA_UPDATE_BEFORE:
-                metaType = "ARRAY<MAP<STRING, STRING>> METADATA FROM 'meta.update_before' VIRTUAL";
-                break;
-            default:
-                throw new UnsupportedOperationException(String.format("Unsupport meta field: %s",
-                        metaField.getBuiltInField()));
-        }
-        return metaType;
-    }
-
-    @Deprecated
-    private String parseOracleExtractNodeMetaField(BuiltInFieldInfo metaField) {
-        String metaType;
-        switch (metaField.getBuiltInField()) {
-            case TABLE_NAME:
-                metaType = "STRING METADATA FROM 'table_name' VIRTUAL";
-                break;
-            case SCHEMA_NAME:
-                metaType = "STRING METADATA FROM 'schema_name' VIRTUAL";
-                break;
-            case DATABASE_NAME:
-                metaType = "STRING METADATA FROM 'database_name' VIRTUAL";
-                break;
-            case OP_TS:
-                metaType = "TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL";
-                break;
-            default:
-                throw new UnsupportedOperationException(String.format("Unsupport meta field: %s",
-                        metaField.getBuiltInField()));
-        }
-        return metaType;
-    }
-
     private String parseOracleExtractNodeMetaField(MetaFieldInfo metaFieldInfo) {
         String metaType;
         switch (metaFieldInfo.getMetaField()) {
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
index 3a6057517..26892eb69 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
@@ -20,12 +20,12 @@ package org.apache.inlong.sort.parser;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
 import org.apache.inlong.sort.parser.result.ParseResult;
-import org.apache.inlong.sort.formats.common.StringFormatInfo;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.MetaFieldInfo;
 import org.apache.inlong.sort.protocol.StreamInfo;
 import org.apache.inlong.sort.protocol.node.Node;
 import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
@@ -43,13 +43,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import static org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField.MYSQL_METADATA_DATA;
-
 public class AllMigrateTest {
 
     private MySqlExtractNode buildAllMigrateExtractNode() {
         List<FieldInfo> fields = Arrays.asList(
-                new BuiltInFieldInfo("data", new StringFormatInfo(), MYSQL_METADATA_DATA));
+                new MetaFieldInfo("data", MetaFieldInfo.MetaField.DATA));
         Map<String, String> option = new HashMap<>();
         option.put("append-mode", "true");
         option.put("migrate-all", "true");
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DistinctNodeSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DistinctNodeSqlParseTest.java
index bc538e3dd..f889caf50 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DistinctNodeSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DistinctNodeSqlParseTest.java
@@ -21,17 +21,16 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.test.util.AbstractTestBase;
-import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
-import org.apache.inlong.sort.parser.result.ParseResult;
 import org.apache.inlong.sort.formats.common.FloatFormatInfo;
 import org.apache.inlong.sort.formats.common.IntFormatInfo;
 import org.apache.inlong.sort.formats.common.LongFormatInfo;
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField;
+import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
+import org.apache.inlong.sort.parser.result.ParseResult;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.MetaFieldInfo;
 import org.apache.inlong.sort.protocol.StreamInfo;
 import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
 import org.apache.inlong.sort.protocol.node.Node;
@@ -65,7 +64,7 @@ public class DistinctNodeSqlParseTest extends AbstractTestBase {
                 new FieldInfo("age", new IntFormatInfo()),
                 new FieldInfo("salary", new FloatFormatInfo()),
                 new FieldInfo("ts", new TimestampFormatInfo()),
-                new BuiltInFieldInfo("proctime", new TimestampFormatInfo(), BuiltInField.PROCESS_TIME));
+                new MetaFieldInfo("proctime", MetaFieldInfo.MetaField.PROCESS_TIME));
         return new KafkaExtractNode("1", "kafka_input", fields, null,
                 null, "topic_input", "localhost:9092",
                 new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET,
@@ -78,7 +77,7 @@ public class DistinctNodeSqlParseTest extends AbstractTestBase {
                 new FieldInfo("age", new IntFormatInfo()),
                 new FieldInfo("salary", new FloatFormatInfo()),
                 new FieldInfo("ts", new TimestampFormatInfo()),
-                new BuiltInFieldInfo("proctime", new TimestampFormatInfo(), BuiltInField.PROCESS_TIME));
+                new MetaFieldInfo("proctime", MetaFieldInfo.MetaField.PROCESS_TIME));
         WatermarkField wk = new WatermarkField(new FieldInfo("ts", new TimestampFormatInfo()),
                 new StringConstantParam("1"),
                 new TimeUnitConstantParam(TimeUnit.SECOND));
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MetaFieldSyncTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MetaFieldSyncTest.java
index 045fba0c7..eb71f10c4 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MetaFieldSyncTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MetaFieldSyncTest.java
@@ -21,20 +21,16 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.test.util.AbstractTestBase;
-import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
-import org.apache.inlong.sort.parser.result.ParseResult;
-import org.apache.inlong.sort.formats.common.ArrayFormatInfo;
-import org.apache.inlong.sort.formats.common.BooleanFormatInfo;
 import org.apache.inlong.sort.formats.common.FloatFormatInfo;
 import org.apache.inlong.sort.formats.common.IntFormatInfo;
 import org.apache.inlong.sort.formats.common.LongFormatInfo;
-import org.apache.inlong.sort.formats.common.MapFormatInfo;
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField;
+import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
+import org.apache.inlong.sort.parser.result.ParseResult;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.MetaFieldInfo;
 import org.apache.inlong.sort.protocol.StreamInfo;
 import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
 import org.apache.inlong.sort.protocol.node.Node;
@@ -63,27 +59,17 @@ public class MetaFieldSyncTest extends AbstractTestBase {
                 new FieldInfo("age", new IntFormatInfo()),
                 new FieldInfo("salary", new FloatFormatInfo()),
                 new FieldInfo("ts", new TimestampFormatInfo()),
-                new BuiltInFieldInfo("database", new StringFormatInfo(),
-                        BuiltInField.MYSQL_METADATA_DATABASE),
-                new BuiltInFieldInfo("table", new StringFormatInfo(),
-                        BuiltInField.MYSQL_METADATA_TABLE),
-                new BuiltInFieldInfo("pk_names", new ArrayFormatInfo(new StringFormatInfo()),
-                        BuiltInField.METADATA_PK_NAMES),
-                new BuiltInFieldInfo("event_time", new TimestampFormatInfo(),
-                        BuiltInField.MYSQL_METADATA_EVENT_TIME),
-                new BuiltInFieldInfo("event_type", new StringFormatInfo(),
-                        BuiltInField.MYSQL_METADATA_EVENT_TYPE),
-                new BuiltInFieldInfo("isddl", new BooleanFormatInfo(),
-                        BuiltInField.MYSQL_METADATA_IS_DDL),
-                new BuiltInFieldInfo("batch_id", new LongFormatInfo(),
-                        BuiltInField.METADATA_BATCH_ID),
-                new BuiltInFieldInfo("mysql_type", new MapFormatInfo(new StringFormatInfo(),
-                        new StringFormatInfo()), BuiltInField.METADATA_MYSQL_TYPE),
-                new BuiltInFieldInfo("sql_type", new MapFormatInfo(new StringFormatInfo(),
-                        new IntFormatInfo()), BuiltInField.METADATA_SQL_TYPE),
-                new BuiltInFieldInfo("meta_ts", new TimestampFormatInfo(), BuiltInField.METADATA_TS),
-                new BuiltInFieldInfo("up_before", new MapFormatInfo(new StringFormatInfo(),
-                        new StringFormatInfo()), BuiltInField.METADATA_UPDATE_BEFORE)
+                new MetaFieldInfo("database", MetaFieldInfo.MetaField.DATABASE_NAME),
+                new MetaFieldInfo("table", MetaFieldInfo.MetaField.TABLE_NAME),
+                new MetaFieldInfo("pk_names", MetaFieldInfo.MetaField.PK_NAMES),
+                new MetaFieldInfo("event_time", MetaFieldInfo.MetaField.OP_TS),
+                new MetaFieldInfo("event_type", MetaFieldInfo.MetaField.OP_TYPE),
+                new MetaFieldInfo("isddl", MetaFieldInfo.MetaField.IS_DDL),
+                new MetaFieldInfo("batch_id", MetaFieldInfo.MetaField.BATCH_ID),
+                new MetaFieldInfo("mysql_type", MetaFieldInfo.MetaField.MYSQL_TYPE),
+                new MetaFieldInfo("sql_type", MetaFieldInfo.MetaField.SQL_TYPE),
+                new MetaFieldInfo("meta_ts", MetaFieldInfo.MetaField.TS),
+                new MetaFieldInfo("up_before", MetaFieldInfo.MetaField.UPDATE_BEFORE)
         );
         return new MySqlExtractNode("1", "mysql_input", fields, null, null,
                 "id", Collections.singletonList("mysql_table"),
@@ -97,27 +83,17 @@ public class MetaFieldSyncTest extends AbstractTestBase {
                 new FieldInfo("age", new IntFormatInfo()),
                 new FieldInfo("salary", new FloatFormatInfo()),
                 new FieldInfo("ts", new TimestampFormatInfo()),
-                new BuiltInFieldInfo("database", new StringFormatInfo(),
-                        BuiltInField.MYSQL_METADATA_DATABASE),
-                new BuiltInFieldInfo("table", new StringFormatInfo(),
-                        BuiltInField.MYSQL_METADATA_TABLE),
-                new BuiltInFieldInfo("pk_names", new ArrayFormatInfo(new StringFormatInfo()),
-                        BuiltInField.METADATA_PK_NAMES),
-                new BuiltInFieldInfo("event_time", new TimestampFormatInfo(),
-                        BuiltInField.MYSQL_METADATA_EVENT_TIME),
-                new BuiltInFieldInfo("event_type", new StringFormatInfo(),
-                        BuiltInField.MYSQL_METADATA_EVENT_TYPE),
-                new BuiltInFieldInfo("isddl", new BooleanFormatInfo(),
-                        BuiltInField.MYSQL_METADATA_IS_DDL),
-                new BuiltInFieldInfo("batch_id", new LongFormatInfo(),
-                        BuiltInField.METADATA_BATCH_ID),
-                new BuiltInFieldInfo("mysql_type", new MapFormatInfo(new StringFormatInfo(),
-                        new StringFormatInfo()), BuiltInField.METADATA_MYSQL_TYPE),
-                new BuiltInFieldInfo("sql_type", new MapFormatInfo(new StringFormatInfo(),
-                        new IntFormatInfo()), BuiltInField.METADATA_SQL_TYPE),
-                new BuiltInFieldInfo("meta_ts", new TimestampFormatInfo(), BuiltInField.METADATA_TS),
-                new BuiltInFieldInfo("up_before", new MapFormatInfo(new StringFormatInfo(),
-                        new StringFormatInfo()), BuiltInField.METADATA_UPDATE_BEFORE)
+                new MetaFieldInfo("database", MetaFieldInfo.MetaField.DATABASE_NAME),
+                new MetaFieldInfo("table", MetaFieldInfo.MetaField.TABLE_NAME),
+                new MetaFieldInfo("pk_names", MetaFieldInfo.MetaField.PK_NAMES),
+                new MetaFieldInfo("event_time", MetaFieldInfo.MetaField.OP_TS),
+                new MetaFieldInfo("event_type", MetaFieldInfo.MetaField.OP_TYPE),
+                new MetaFieldInfo("isddl", MetaFieldInfo.MetaField.IS_DDL),
+                new MetaFieldInfo("batch_id", MetaFieldInfo.MetaField.BATCH_ID),
+                new MetaFieldInfo("mysql_type", MetaFieldInfo.MetaField.MYSQL_TYPE),
+                new MetaFieldInfo("sql_type", MetaFieldInfo.MetaField.SQL_TYPE),
+                new MetaFieldInfo("meta_ts", MetaFieldInfo.MetaField.TS),
+                new MetaFieldInfo("up_before", MetaFieldInfo.MetaField.UPDATE_BEFORE)
         );
         List<FieldRelation> relations = Arrays
                 .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
@@ -163,27 +139,17 @@ public class MetaFieldSyncTest extends AbstractTestBase {
                 new FieldInfo("age", new IntFormatInfo()),
                 new FieldInfo("salary", new FloatFormatInfo()),
                 new FieldInfo("ts", new TimestampFormatInfo()),
-                new BuiltInFieldInfo("database", new StringFormatInfo(),
-                        BuiltInField.MYSQL_METADATA_DATABASE),
-                new BuiltInFieldInfo("table", new StringFormatInfo(),
-                        BuiltInField.MYSQL_METADATA_TABLE),
-                new BuiltInFieldInfo("pk_names", new ArrayFormatInfo(new StringFormatInfo()),
-                        BuiltInField.METADATA_PK_NAMES),
-                new BuiltInFieldInfo("event_time", new TimestampFormatInfo(),
-                        BuiltInField.MYSQL_METADATA_EVENT_TIME),
-                new BuiltInFieldInfo("event_type", new StringFormatInfo(),
-                        BuiltInField.MYSQL_METADATA_EVENT_TYPE),
-                new BuiltInFieldInfo("isddl", new BooleanFormatInfo(),
-                        BuiltInField.MYSQL_METADATA_IS_DDL),
-                new BuiltInFieldInfo("batch_id", new LongFormatInfo(),
-                        BuiltInField.METADATA_BATCH_ID),
-                new BuiltInFieldInfo("mysql_type", new MapFormatInfo(new StringFormatInfo(),
-                        new StringFormatInfo()), BuiltInField.METADATA_MYSQL_TYPE),
-                new BuiltInFieldInfo("sql_type", new MapFormatInfo(new StringFormatInfo(),
-                        new IntFormatInfo()), BuiltInField.METADATA_SQL_TYPE),
-                new BuiltInFieldInfo("meta_ts", new TimestampFormatInfo(), BuiltInField.METADATA_TS),
-                new BuiltInFieldInfo("up_before", new MapFormatInfo(new StringFormatInfo(),
-                        new StringFormatInfo()), BuiltInField.METADATA_UPDATE_BEFORE)
+                new MetaFieldInfo("database", MetaFieldInfo.MetaField.DATABASE_NAME),
+                new MetaFieldInfo("table", MetaFieldInfo.MetaField.TABLE_NAME),
+                new MetaFieldInfo("pk_names", MetaFieldInfo.MetaField.PK_NAMES),
+                new MetaFieldInfo("event_time", MetaFieldInfo.MetaField.OP_TS),
+                new MetaFieldInfo("event_type", MetaFieldInfo.MetaField.OP_TYPE),
+                new MetaFieldInfo("isddl", MetaFieldInfo.MetaField.IS_DDL),
+                new MetaFieldInfo("batch_id", MetaFieldInfo.MetaField.BATCH_ID),
+                new MetaFieldInfo("mysql_type", MetaFieldInfo.MetaField.MYSQL_TYPE),
+                new MetaFieldInfo("sql_type", MetaFieldInfo.MetaField.SQL_TYPE),
+                new MetaFieldInfo("meta_ts", MetaFieldInfo.MetaField.TS),
+                new MetaFieldInfo("up_before", MetaFieldInfo.MetaField.UPDATE_BEFORE)
         );
         return new KafkaExtractNode("3", "kafka_input", fields,
                 null, null, "topic1", "localhost:9092",
@@ -197,27 +163,17 @@ public class MetaFieldSyncTest extends AbstractTestBase {
                 new FieldInfo("age", new IntFormatInfo()),
                 new FieldInfo("salary", new FloatFormatInfo()),
                 new FieldInfo("ts", new TimestampFormatInfo()),
-                new BuiltInFieldInfo("database", new StringFormatInfo(),
-                        BuiltInField.MYSQL_METADATA_DATABASE),
-                new BuiltInFieldInfo("table", new StringFormatInfo(),
-                        BuiltInField.MYSQL_METADATA_TABLE),
-                new BuiltInFieldInfo("pk_names", new ArrayFormatInfo(new StringFormatInfo()),
-                        BuiltInField.METADATA_PK_NAMES),
-                new BuiltInFieldInfo("event_time", new TimestampFormatInfo(),
-                        BuiltInField.MYSQL_METADATA_EVENT_TIME),
-                new BuiltInFieldInfo("event_type", new StringFormatInfo(),
-                        BuiltInField.MYSQL_METADATA_EVENT_TYPE),
-                new BuiltInFieldInfo("isddl", new BooleanFormatInfo(),
-                        BuiltInField.MYSQL_METADATA_IS_DDL),
-                new BuiltInFieldInfo("batch_id", new LongFormatInfo(),
-                        BuiltInField.METADATA_BATCH_ID),
-                new BuiltInFieldInfo("mysql_type", new MapFormatInfo(new StringFormatInfo(),
-                        new StringFormatInfo()), BuiltInField.METADATA_MYSQL_TYPE),
-                new BuiltInFieldInfo("sql_type", new MapFormatInfo(new StringFormatInfo(),
-                        new IntFormatInfo()), BuiltInField.METADATA_SQL_TYPE),
-                new BuiltInFieldInfo("meta_ts", new TimestampFormatInfo(), BuiltInField.METADATA_TS),
-                new BuiltInFieldInfo("up_before", new MapFormatInfo(new StringFormatInfo(),
-                        new StringFormatInfo()), BuiltInField.METADATA_UPDATE_BEFORE)
+                new MetaFieldInfo("database", MetaFieldInfo.MetaField.DATABASE_NAME),
+                new MetaFieldInfo("table", MetaFieldInfo.MetaField.TABLE_NAME),
+                new MetaFieldInfo("pk_names", MetaFieldInfo.MetaField.PK_NAMES),
+                new MetaFieldInfo("event_time", MetaFieldInfo.MetaField.OP_TS),
+                new MetaFieldInfo("event_type", MetaFieldInfo.MetaField.OP_TYPE),
+                new MetaFieldInfo("isddl", MetaFieldInfo.MetaField.IS_DDL),
+                new MetaFieldInfo("batch_id", MetaFieldInfo.MetaField.BATCH_ID),
+                new MetaFieldInfo("mysql_type", MetaFieldInfo.MetaField.MYSQL_TYPE),
+                new MetaFieldInfo("sql_type", MetaFieldInfo.MetaField.SQL_TYPE),
+                new MetaFieldInfo("meta_ts", MetaFieldInfo.MetaField.TS),
+                new MetaFieldInfo("up_before", MetaFieldInfo.MetaField.UPDATE_BEFORE)
         );
         List<FieldRelation> relations = Arrays
                 .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),