You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/24 08:31:54 UTC

[incubator-inlong] branch master updated: [INLONG-3334][Manager] Get source field list from stream field table for Sort (#3344)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 07f612b  [INLONG-3334][Manager] Get source field list from stream field table for Sort (#3344)
07f612b is described below

commit 07f612b9afd972f4cb3d10a3988825ffded985bb
Author: healchow <he...@gmail.com>
AuthorDate: Thu Mar 24 16:31:50 2022 +0800

    [INLONG-3334][Manager] Get source field list from stream field table for Sort (#3344)
    
    * [INLONG-3334][Manager] Get source field list from stream field table for Sort
    
    * [INLONG-3334][Manager] Change order by field
    
    * [INLONG-3334][Manager] Change the field comment
---
 .../inlong/manager/client/api/SinkField.java       |  10 +-
 .../inlong/manager/client/api/StreamField.java     |   6 +-
 .../manager/client/api/impl/InlongStreamImpl.java  |  16 +-
 .../client/api/util/InlongStreamSinkTransfer.java  |  22 +--
 .../client/api/util/InlongStreamTransfer.java      |  15 +-
 .../manager/common/pojo/sink/SinkFieldRequest.java |   4 +-
 .../common/pojo/sink/SinkFieldResponse.java        |   4 +-
 .../common/pojo/stream/InlongStreamFieldInfo.java  |  10 +-
 .../dao/entity/InlongStreamFieldEntity.java        |   4 +-
 .../manager/dao/entity/StreamSinkFieldEntity.java  |   2 +-
 .../dao/mapper/InlongStreamFieldEntityMapper.java  |  24 +--
 .../mappers/InlongStreamFieldEntityMapper.xml      | 209 +++++----------------
 .../mappers/StreamSinkFieldEntityMapper.xml        |  17 +-
 .../manager/service/CommonOperateService.java      |  19 +-
 .../service/core/impl/AgentServiceImpl.java        |   4 +-
 .../service/core/impl/InlongStreamServiceImpl.java |  13 +-
 .../thirdparty/sort/util/FieldInfoUtils.java       |  99 ++++------
 .../main/resources/sql/apache_inlong_manager.sql   |  27 +--
 .../manager-web/sql/apache_inlong_manager.sql      |  27 +--
 19 files changed, 202 insertions(+), 330 deletions(-)

diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SinkField.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SinkField.java
index a728a72..ecf520f 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SinkField.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SinkField.java
@@ -20,10 +20,12 @@ package org.apache.inlong.manager.client.api;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
+import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.FieldType;
 
 @Data
+@EqualsAndHashCode(callSuper = true)
 @NoArgsConstructor
 @ApiModel("Sink field configuration")
 public class SinkField extends StreamField {
@@ -34,14 +36,10 @@ public class SinkField extends StreamField {
     @ApiModelProperty("Source field type")
     private String sourceFieldType;
 
-    @ApiModelProperty("Is source meta field, 0: no, 1: yes")
-    private Integer isSourceMetaField = 0;
-
     public SinkField(int index, FieldType fieldType, String fieldName, String fieldComment,
-            String fieldValue, String sourceFieldName, String sourceFieldType, Integer isSourceMetaField) {
-        super(index, fieldType, fieldName, fieldComment, fieldValue);
+            String fieldValue, String sourceFieldName, String sourceFieldType, Integer isMetaField) {
+        super(index, fieldType, fieldName, fieldComment, fieldValue, isMetaField);
         this.sourceFieldName = sourceFieldName;
         this.sourceFieldType = sourceFieldType;
-        this.isSourceMetaField = isSourceMetaField;
     }
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamField.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamField.java
index 992c500..445c53b 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamField.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamField.java
@@ -31,7 +31,7 @@ import org.apache.inlong.manager.common.enums.FieldType;
 public class StreamField {
 
     @ApiModelProperty("Field index")
-    private int index;
+    private Integer id;
 
     @ApiModelProperty(value = "Field type", required = true)
     private FieldType fieldType;
@@ -44,4 +44,8 @@ public class StreamField {
 
     @ApiModelProperty(value = "Field value for constants")
     private String fieldValue;
+
+    @ApiModelProperty("Is this field a meta field, 0: no, 1: yes")
+    private Integer isMetaField = 0;
+
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
index e1b6180..d6a2ad7 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
@@ -60,12 +60,14 @@ public class InlongStreamImpl extends InlongStream {
         List<InlongStreamFieldInfo> streamFieldInfos = streamInfo.getFieldList();
         if (CollectionUtils.isNotEmpty(streamFieldInfos)) {
             this.streamFields = streamFieldInfos.stream()
-                    .map(streamFieldInfo -> new StreamField(
-                            streamFieldInfo.getId(),
-                            FieldType.forName(streamFieldInfo.getFieldType()),
-                            streamFieldInfo.getFieldName(),
-                            streamFieldInfo.getFieldComment(),
-                            streamFieldInfo.getFieldValue())
+                    .map(fieldInfo -> new StreamField(
+                                    fieldInfo.getId(),
+                                    FieldType.forName(fieldInfo.getFieldType()),
+                                    fieldInfo.getFieldName(),
+                                    fieldInfo.getFieldComment(),
+                                    fieldInfo.getFieldValue(),
+                                    fieldInfo.getIsMetaField()
+                            )
                     ).collect(Collectors.toList());
         }
         List<SinkResponse> sinkList = fullStreamResponse.getSinkInfo();
@@ -82,7 +84,7 @@ public class InlongStreamImpl extends InlongStream {
         List<SourceResponse> sourceList = fullStreamResponse.getSourceInfo();
         if (CollectionUtils.isNotEmpty(sourceList)) {
             this.streamSources = sourceList.stream()
-                    .map(sourceResponse -> InlongStreamSourceTransfer.parseStreamSource(sourceResponse))
+                    .map(InlongStreamSourceTransfer::parseStreamSource)
                     .collect(Collectors.toMap(StreamSource::getSourceName, streamSource -> streamSource,
                             (source1, source2) -> {
                                 throw new RuntimeException(
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
index dccbdfb..38bf6e7 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
@@ -153,7 +153,7 @@ public class InlongStreamSinkTransfer {
                 sinkFieldResponse.getFieldComment(),
                 null, sinkFieldResponse.getSourceFieldName(),
                 sinkFieldResponse.getSourceFieldType(),
-                sinkFieldResponse.getIsSourceMetaField())).collect(Collectors.toList());
+                sinkFieldResponse.getIsMetaField())).collect(Collectors.toList());
 
     }
 
@@ -231,18 +231,18 @@ public class InlongStreamSinkTransfer {
     }
 
     private static List<SinkFieldRequest> createSinkFieldRequests(List<SinkField> sinkFields) {
-        List<SinkFieldRequest> sinkFieldRequests = Lists.newArrayList();
+        List<SinkFieldRequest> fieldRequestList = Lists.newArrayList();
         for (SinkField sinkField : sinkFields) {
-            SinkFieldRequest sinkFieldRequest = new SinkFieldRequest();
-            sinkFieldRequest.setFieldName(sinkField.getFieldName());
-            sinkFieldRequest.setFieldType(sinkField.getFieldType().toString());
-            sinkFieldRequest.setFieldComment(sinkField.getFieldComment());
-            sinkFieldRequest.setSourceFieldName(sinkField.getSourceFieldName());
-            sinkFieldRequest.setSourceFieldType(sinkField.getSourceFieldType());
-            sinkFieldRequest.setIsSourceMetaField(sinkField.getIsSourceMetaField());
-            sinkFieldRequests.add(sinkFieldRequest);
+            SinkFieldRequest request = new SinkFieldRequest();
+            request.setFieldName(sinkField.getFieldName());
+            request.setFieldType(sinkField.getFieldType().toString());
+            request.setFieldComment(sinkField.getFieldComment());
+            request.setSourceFieldName(sinkField.getSourceFieldName());
+            request.setSourceFieldType(sinkField.getSourceFieldType());
+            request.setIsMetaField(sinkField.getIsMetaField());
+            fieldRequestList.add(request);
         }
-        return sinkFieldRequests;
+        return fieldRequestList;
     }
 
     private static HiveSink parseHiveSink(HiveSinkResponse sinkResponse, StreamSink sink) {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
index 0ca083b..bf68845 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
@@ -57,18 +57,17 @@ public class InlongStreamTransfer {
     }
 
     public static List<InlongStreamFieldInfo> createStreamFields(
-            List<StreamField> fieldList,
-            InlongStreamResponse streamInfo) {
-        List<InlongStreamFieldInfo> fieldInfos = fieldList.stream().map(streamField -> {
+            List<StreamField> fieldList, InlongStreamResponse streamInfo) {
+        return fieldList.stream().map(field -> {
             InlongStreamFieldInfo fieldInfo = new InlongStreamFieldInfo();
             fieldInfo.setInlongStreamId(streamInfo.getInlongStreamId());
             fieldInfo.setInlongGroupId(streamInfo.getInlongGroupId());
-            fieldInfo.setFieldName(streamField.getFieldName());
-            fieldInfo.setFieldType(streamField.getFieldType().toString());
-            fieldInfo.setFieldComment(streamField.getFieldComment());
-            fieldInfo.setFieldValue(streamField.getFieldValue());
+            fieldInfo.setFieldName(field.getFieldName());
+            fieldInfo.setFieldType(field.getFieldType().toString());
+            fieldInfo.setFieldComment(field.getFieldComment());
+            fieldInfo.setFieldValue(field.getFieldValue());
+            fieldInfo.setIsMetaField(field.getIsMetaField());
             return fieldInfo;
         }).collect(Collectors.toList());
-        return fieldInfos;
     }
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldRequest.java
index 8efb7cf..d5d920a 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldRequest.java
@@ -46,8 +46,8 @@ public class SinkFieldRequest {
     @ApiModelProperty("Source field type")
     private String sourceFieldType;
 
-    @ApiModelProperty("Is source meta field, 0: no, 1: yes")
-    private Integer isSourceMetaField = 0;
+    @ApiModelProperty("Is this field a meta field, 0: no, 1: yes")
+    private Integer isMetaField = 0;
 
     @ApiModelProperty("Field order")
     private Short rankNum;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldResponse.java
index 773d476..8933a25 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldResponse.java
@@ -51,8 +51,8 @@ public class SinkFieldResponse {
     @ApiModelProperty("Source field type")
     private String sourceFieldType;
 
-    @ApiModelProperty("Is source meta field, 0: no, 1: yes")
-    private Integer isSourceMetaField = 0;
+    @ApiModelProperty("Is this field a meta field, 0: no, 1: yes")
+    private Integer isMetaField = 0;
 
     @ApiModelProperty("Field order")
     private Short rankNum;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamFieldInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamFieldInfo.java
index 754280a..d10873f 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamFieldInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamFieldInfo.java
@@ -22,7 +22,7 @@ import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 
 /**
- * Inlong stream field info
+ * Inlong stream field info.
  */
 @Data
 @ApiModel("Inlong stream field info")
@@ -48,14 +48,16 @@ public class InlongStreamFieldInfo {
     @ApiModelProperty(value = "value expression of predefined field")
     private String preExpression;
 
+    @ApiModelProperty("Field type")
     private String fieldType;
 
+    @ApiModelProperty("Field comment")
     private String fieldComment;
 
+    @ApiModelProperty("Is this field a meta field, 0: no, 1: yes")
+    private Integer isMetaField = 0;
+
     @ApiModelProperty(value = "field rank num")
     private Short rankNum;
 
-    @ApiModelProperty(value = "is deleted? 0: deleted, > 0: not deleted")
-    private Integer isDeleted = 0;
-
 }
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongStreamFieldEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongStreamFieldEntity.java
index a13934c..0109030 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongStreamFieldEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongStreamFieldEntity.java
@@ -17,9 +17,10 @@
 
 package org.apache.inlong.manager.dao.entity;
 
-import java.io.Serializable;
 import lombok.Data;
 
+import java.io.Serializable;
+
 @Data
 public class InlongStreamFieldEntity implements Serializable {
 
@@ -33,6 +34,7 @@ public class InlongStreamFieldEntity implements Serializable {
     private String preExpression;
     private String fieldType;
     private String fieldComment;
+    private Integer isMetaField;
     private Short rankNum;
     private Integer isDeleted;
 
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java
index 98f888f..a32a123 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java
@@ -37,7 +37,7 @@ public class StreamSinkFieldEntity implements Serializable {
     private Integer isRequired;
     private String sourceFieldName;
     private String sourceFieldType;
-    private Integer isSourceMetaField;
+    private Integer isMetaField;
     private Short rankNum;
     private Integer isDeleted;
 
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamFieldEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamFieldEntityMapper.java
index 4486e1c..fb20c11 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamFieldEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamFieldEntityMapper.java
@@ -26,34 +26,30 @@ import java.util.List;
 @Repository
 public interface InlongStreamFieldEntityMapper {
 
-    int deleteByPrimaryKey(Integer id);
-
     int insert(InlongStreamFieldEntity record);
 
-    int insertSelective(InlongStreamFieldEntity record);
+    int insertAll(@Param("fieldList") List<InlongStreamFieldEntity> fieldEntityList);
 
     InlongStreamFieldEntity selectByPrimaryKey(Integer id);
 
-    int updateByPrimaryKeySelective(InlongStreamFieldEntity record);
-
-    int updateByPrimaryKey(InlongStreamFieldEntity record);
-
     List<InlongStreamFieldEntity> selectByIdentifier(@Param("groupId") String groupId,
             @Param("streamId") String streamId);
 
-    int insertAll(@Param("fieldList") List<InlongStreamFieldEntity> fieldEntityList);
-
-    List<InlongStreamFieldEntity> selectStreamFields(@Param("groupId") String groupId,
+    List<InlongStreamFieldEntity> selectFields(@Param("groupId") String groupId,
             @Param("streamId") String streamId);
 
-    /**
-     * According to the inlong group id and inlong stream id, physically delete all fields
-     */
-    int deleteAllByIdentifier(@Param("groupId") String groupId, @Param("streamId") String streamId);
+    int updateByPrimaryKey(InlongStreamFieldEntity record);
+
+    int deleteByPrimaryKey(Integer id);
 
     /**
      * According to the inlong group id and inlong stream id, logically delete all fields
      */
     int logicDeleteAllByIdentifier(@Param("groupId") String groupId, @Param("streamId") String streamId);
 
+    /**
+     * According to the inlong group id and inlong stream id, physically delete all fields
+     */
+    int deleteAllByIdentifier(@Param("groupId") String groupId, @Param("streamId") String streamId);
+
 }
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamFieldEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamFieldEntityMapper.xml
index 4bee203..08f1afb 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamFieldEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamFieldEntityMapper.xml
@@ -30,148 +30,38 @@
         <result column="pre_expression" jdbcType="VARCHAR" property="preExpression"/>
         <result column="field_type" jdbcType="VARCHAR" property="fieldType"/>
         <result column="field_comment" jdbcType="VARCHAR" property="fieldComment"/>
+        <result column="is_meta_field" jdbcType="SMALLINT" property="isMetaField"/>
         <result column="rank_num" jdbcType="INTEGER" property="rankNum"/>
         <result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
     </resultMap>
     <sql id="Base_Column_List">
         id, inlong_group_id, inlong_stream_id, is_predefined_field, field_name, field_value,
-        pre_expression, field_type, field_comment, rank_num, is_deleted
+        pre_expression, field_type, field_comment, is_meta_field, rank_num, is_deleted
     </sql>
-    <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="BaseResultMap">
-        select
-        <include refid="Base_Column_List"/>
-        from inlong_stream_field
-        where id = #{id,jdbcType=INTEGER}
-    </select>
-    <select id="selectByIdentifier" resultType="org.apache.inlong.manager.dao.entity.InlongStreamFieldEntity">
-        select
-        <include refid="Base_Column_List"/>
-        from inlong_stream_field
-        where inlong_group_id = #{groupId, jdbcType=VARCHAR}
-        and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
-        and is_deleted = 0
-        order by rank_num asc
-    </select>
-    <select id="selectStreamFields" resultMap="BaseResultMap">
-        select f.*
-        from inlong_stream_field f,
-             inlong_stream s
-        where f.is_deleted = 0
-          and s.is_deleted = 0
-          and s.id = f.inlong_stream_id
-          and s.inlong_group_id = #{groupId, jdbcType=VARCHAR}
-          and s.inlong_stream_id = #{streamId, jdbcType=VARCHAR}
-          and f.is_predefined_field = 0
-    </select>
-
-    <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
-        delete
-        from inlong_stream_field
-        where id = #{id,jdbcType=INTEGER}
-    </delete>
-    <delete id="deleteAllByIdentifier">
-        delete
-        from inlong_stream_field
-        where inlong_group_id = #{groupId, jdbcType=VARCHAR}
-          and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
-          and is_deleted = 0
-    </delete>
 
     <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.InlongStreamFieldEntity">
         insert into inlong_stream_field (id, inlong_group_id, inlong_stream_id,
                                          is_predefined_field, field_name, field_value,
                                          pre_expression, field_type, field_comment,
-                                         rank_num, is_deleted)
+                                         is_meta_field, rank_num, is_deleted)
         values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
                 #{isPredefinedField,jdbcType=INTEGER}, #{fieldName,jdbcType=VARCHAR}, #{fieldValue,jdbcType=VARCHAR},
                 #{preExpression,jdbcType=VARCHAR}, #{fieldType,jdbcType=VARCHAR}, #{fieldComment,jdbcType=VARCHAR},
-                #{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER})
-    </insert>
-    <insert id="insertSelective" parameterType="org.apache.inlong.manager.dao.entity.InlongStreamFieldEntity">
-        insert into inlong_stream_field
-        <trim prefix="(" suffix=")" suffixOverrides=",">
-            <if test="id != null">
-                id,
-            </if>
-            <if test="inlongGroupId != null">
-                inlong_group_id,
-            </if>
-            <if test="inlongStreamId != null">
-                inlong_stream_id,
-            </if>
-            <if test="isPredefinedField != null">
-                is_predefined_field,
-            </if>
-            <if test="fieldName != null">
-                field_name,
-            </if>
-            <if test="fieldValue != null">
-                field_value,
-            </if>
-            <if test="preExpression != null">
-                pre_expression,
-            </if>
-            <if test="fieldType != null">
-                field_type,
-            </if>
-            <if test="fieldComment != null">
-                field_comment,
-            </if>
-            <if test="rankNum != null">
-                rank_num,
-            </if>
-            <if test="isDeleted != null">
-                is_deleted,
-            </if>
-        </trim>
-        <trim prefix="values (" suffix=")" suffixOverrides=",">
-            <if test="id != null">
-                #{id,jdbcType=INTEGER},
-            </if>
-            <if test="inlongGroupId != null">
-                #{inlongGroupId,jdbcType=VARCHAR},
-            </if>
-            <if test="inlongStreamId != null">
-                #{inlongStreamId,jdbcType=VARCHAR},
-            </if>
-            <if test="isPredefinedField != null">
-                #{isPredefinedField,jdbcType=INTEGER},
-            </if>
-            <if test="fieldName != null">
-                #{fieldName,jdbcType=VARCHAR},
-            </if>
-            <if test="fieldValue != null">
-                #{fieldValue,jdbcType=VARCHAR},
-            </if>
-            <if test="preExpression != null">
-                #{preExpression,jdbcType=VARCHAR},
-            </if>
-            <if test="fieldType != null">
-                #{fieldType,jdbcType=VARCHAR},
-            </if>
-            <if test="fieldComment != null">
-                #{fieldComment,jdbcType=VARCHAR},
-            </if>
-            <if test="rankNum != null">
-                #{rankNum,jdbcType=SMALLINT},
-            </if>
-            <if test="isDeleted != null">
-                #{isDeleted,jdbcType=INTEGER},
-            </if>
-        </trim>
+                #{isMetaField,jdbcType=SMALLINT}, #{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER})
     </insert>
-
     <!-- Bulk insert, update if it exists -->
     <insert id="insertAll" parameterType="java.util.List">
-        insert into inlong_stream_field
-        (id, inlong_group_id, inlong_stream_id, is_predefined_field, field_name, field_value,
-        pre_expression, field_type, field_comment, rank_num, is_deleted
+        insert into inlong_stream_field (
+        id, inlong_group_id, inlong_stream_id, is_predefined_field,
+        field_name, field_value, pre_expression, field_type,
+        field_comment, is_meta_field, rank_num, is_deleted
         )
         values
         <foreach collection="fieldList" index="index" item="item" separator=",">
-            (#{item.id}, #{item.inlongGroupId}, #{item.inlongStreamId}, #{item.isPredefinedField},
-            #{item.fieldName}, #{item.fieldValue},
-            #{item.preExpression}, #{item.fieldType}, #{item.fieldComment}, #{item.rankNum}, #{item.isDeleted}
+            (
+            #{item.id}, #{item.inlongGroupId}, #{item.inlongStreamId}, #{item.isPredefinedField},
+            #{item.fieldName}, #{item.fieldValue}, #{item.preExpression}, #{item.fieldType},
+            #{item.fieldComment}, #{isMetaField}, #{item.rankNum}, #{item.isDeleted}
             )
         </foreach>
         ON DUPLICATE KEY UPDATE
@@ -188,43 +78,33 @@
         is_deleted = values(is_deleted)
     </insert>
 
-    <update id="updateByPrimaryKeySelective"
-            parameterType="org.apache.inlong.manager.dao.entity.InlongStreamFieldEntity">
-        update inlong_stream_field
-        <set>
-            <if test="inlongGroupId != null">
-                inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
-            </if>
-            <if test="inlongStreamId != null">
-                inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
-            </if>
-            <if test="isPredefinedField != null">
-                is_predefined_field = #{isPredefinedField,jdbcType=INTEGER},
-            </if>
-            <if test="fieldName != null">
-                field_name = #{fieldName,jdbcType=VARCHAR},
-            </if>
-            <if test="fieldValue != null">
-                field_value = #{fieldValue,jdbcType=VARCHAR},
-            </if>
-            <if test="preExpression != null">
-                pre_expression = #{preExpression,jdbcType=VARCHAR},
-            </if>
-            <if test="fieldType != null">
-                field_type = #{fieldType,jdbcType=VARCHAR},
-            </if>
-            <if test="fieldComment != null">
-                field_comment = #{fieldComment,jdbcType=VARCHAR},
-            </if>
-            <if test="rankNum != null">
-                rank_num = #{rankNum,jdbcType=SMALLINT},
-            </if>
-            <if test="isDeleted != null">
-                is_deleted = #{isDeleted,jdbcType=INTEGER},
-            </if>
-        </set>
+    <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="BaseResultMap">
+        select
+        <include refid="Base_Column_List"/>
+        from inlong_stream_field
         where id = #{id,jdbcType=INTEGER}
-    </update>
+    </select>
+    <select id="selectByIdentifier" resultType="org.apache.inlong.manager.dao.entity.InlongStreamFieldEntity">
+        select
+        <include refid="Base_Column_List"/>
+        from inlong_stream_field
+        where inlong_group_id = #{groupId, jdbcType=VARCHAR}
+        and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
+        and is_deleted = 0
+        order by id asc
+    </select>
+    <select id="selectFields" resultMap="BaseResultMap">
+        select field.*
+        from inlong_stream_field field,
+             inlong_stream stream
+        where field.is_deleted = 0
+          and stream.is_deleted = 0
+          and stream.id = field.inlong_stream_id
+          and stream.inlong_group_id = #{groupId, jdbcType=VARCHAR}
+          and stream.inlong_stream_id = #{streamId, jdbcType=VARCHAR}
+          and field.is_predefined_field = 0
+    </select>
+
     <update id="updateByPrimaryKey" parameterType="org.apache.inlong.manager.dao.entity.InlongStreamFieldEntity">
         update inlong_stream_field
         set inlong_group_id     = #{inlongGroupId,jdbcType=VARCHAR},
@@ -247,4 +127,17 @@
           and is_deleted = 0
     </update>
 
+    <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
+        delete
+        from inlong_stream_field
+        where id = #{id,jdbcType=INTEGER}
+    </delete>
+    <delete id="deleteAllByIdentifier">
+        delete
+        from inlong_stream_field
+        where inlong_group_id = #{groupId, jdbcType=VARCHAR}
+          and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
+          and is_deleted = 0
+    </delete>
+
 </mapper>
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
index c04e68f..c9e495f 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
@@ -31,13 +31,13 @@
         <result column="field_comment" jdbcType="VARCHAR" property="fieldComment"/>
         <result column="source_field_name" jdbcType="VARCHAR" property="sourceFieldName"/>
         <result column="source_field_type" jdbcType="VARCHAR" property="sourceFieldType"/>
-        <result column="is_source_meta_field" jdbcType="SMALLINT" property="isSourceMetaField"/>
+        <result column="is_meta_field" jdbcType="SMALLINT" property="isMetaField"/>
         <result column="rank_num" jdbcType="SMALLINT" property="rankNum"/>
         <result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
     </resultMap>
     <sql id="Base_Column_List">
         id, sink_id, field_name, field_type, field_comment,
-        source_field_name, source_field_type, is_source_meta_field, rank_num, is_deleted
+        source_field_name, source_field_type, is_meta_field, rank_num, is_deleted
     </sql>
 
     <insert id="insert" useGeneratedKeys="true" keyProperty="id"
@@ -45,12 +45,12 @@
         insert into stream_sink_field (id, inlong_group_id, inlong_stream_id,
                                        sink_id, sink_type, field_name,
                                        field_type, field_comment, source_field_name,
-                                       source_field_type, is_source_meta_field,
+                                       source_field_type, is_meta_field,
                                        rank_num, is_deleted)
         values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
                 #{sinkId,jdbcType=INTEGER}, #{sinkType,jdbcType=VARCHAR}, #{fieldName,jdbcType=VARCHAR},
                 #{fieldType,jdbcType=VARCHAR}, #{fieldComment,jdbcType=VARCHAR}, #{sourceFieldName,jdbcType=VARCHAR},
-                #{sourceFieldType,jdbcType=VARCHAR}, #{isSourceMetaField,jdbcType=SMALLINT},
+                #{sourceFieldType,jdbcType=VARCHAR}, #{isMetaField,jdbcType=SMALLINT},
                 #{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER})
     </insert>
     <insert id="insertAll">
@@ -60,8 +60,7 @@
         sink_type, field_name,
         field_type, field_comment,
         source_field_name, source_field_type,
-        is_source_meta_field,
-        rank_num, is_deleted
+        is_meta_field, rank_num, is_deleted
         )
         values
         <foreach collection="list" index="index" item="item" separator=",">
@@ -71,8 +70,8 @@
             #{item.sinkType,jdbcType=VARCHAR}, #{item.fieldName,jdbcType=VARCHAR},
             #{item.fieldType,jdbcType=VARCHAR}, #{item.fieldComment,jdbcType=VARCHAR},
             #{item.sourceFieldName,jdbcType=VARCHAR}, #{item.sourceFieldType,jdbcType=VARCHAR},
-            #{item.isSourceMetaField,jdbcType=SMALLINT},
-            #{item.rankNum,jdbcType=SMALLINT}, #{item.isDeleted,jdbcType=INTEGER}
+            #{item.isMetaField,jdbcType=SMALLINT}, #{item.rankNum,jdbcType=SMALLINT},
+            #{item.isDeleted,jdbcType=INTEGER}
             )
         </foreach>
     </insert>
@@ -89,7 +88,7 @@
         from stream_sink_field
         where sink_id = #{sinkId, jdbcType=INTEGER}
         and is_deleted = 0
-        order by rank_num asc
+        order by id asc
     </select>
     <select id="selectFields" resultMap="BaseResultMap">
         select field.*
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
index bc23767..362aaff 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
@@ -50,6 +50,7 @@ import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.sink.SinkInfo;
 import org.apache.inlong.sort.protocol.source.SourceInfo;
 import org.apache.inlong.sort.protocol.transformation.FieldMappingRule;
+import org.apache.inlong.sort.protocol.transformation.FieldMappingRule.FieldMappingUnit;
 import org.apache.inlong.sort.protocol.transformation.TransformationInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -212,22 +213,30 @@ public class CommonOperateService {
         // Get all field info
         List<FieldInfo> sourceFields = new ArrayList<>();
         List<FieldInfo> sinkFields = new ArrayList<>();
-        String partition = null;
+        // TODO Need support hive partition field
         if (SinkType.forType(sinkResponse.getSinkType()) == SinkType.HIVE) {
             HiveSinkResponse hiveSink = (HiveSinkResponse) sinkResponse;
-            partition = hiveSink.getPrimaryPartition();
+            String partition = hiveSink.getPrimaryPartition();
         }
 
         // TODO Support more than one source and one sink
         final SourceResponse sourceResponse = sourceList.get(0);
         boolean isAllMigration = SourceInfoUtils.isBinlogAllMigration(sourceResponse);
-        FieldMappingRule fieldMappingRule = FieldInfoUtils.createFieldInfo(isAllMigration,
-                sinkResponse.getFieldList(), sourceFields, sinkFields, partition);
+
+        List<FieldMappingUnit> mappingUnitList;
+        InlongStreamResponse streamInfo = streamService.get(groupId, streamId);
+        if (isAllMigration) {
+            mappingUnitList = FieldInfoUtils.setAllMigrationFieldMapping(sourceFields, sinkFields);
+        } else {
+            mappingUnitList = FieldInfoUtils.createFieldInfo(streamInfo.getFieldList(),
+                    sinkResponse.getFieldList(), sourceFields, sinkFields);
+        }
+
+        FieldMappingRule fieldMappingRule = new FieldMappingRule(mappingUnitList.toArray(new FieldMappingUnit[0]));
 
         // Get source info
         String masterAddress = getSpecifiedParam(Constant.TUBE_MASTER_URL);
         PulsarClusterInfo pulsarCluster = getPulsarClusterInfo(groupInfo.getMiddlewareType());
-        InlongStreamResponse streamInfo = streamService.get(groupId, streamId);
         SourceInfo sourceInfo = SourceInfoUtils.createSourceInfo(pulsarCluster, masterAddress, clusterBean,
                 groupInfo, streamInfo, sourceResponse, sourceFields);
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index cb45a55..23215c1 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -309,7 +309,7 @@ public class AgentServiceImpl implements AgentService {
             }
 
             List<InlongStreamFieldEntity> preFields = streamFieldMapper
-                    .selectStreamFields(config.getInlongGroupId(), config.getInlongStreamId());
+                    .selectFields(config.getInlongGroupId(), config.getInlongStreamId());
 
             if (!config.getSortType().equalsIgnoreCase("13")) {
                 int fIndex = 0;
@@ -457,7 +457,7 @@ public class AgentServiceImpl implements AgentService {
             s.append("p=t").append("&");
         }
 
-        List<InlongStreamFieldEntity> preFields = streamFieldMapper.selectStreamFields(
+        List<InlongStreamFieldEntity> preFields = streamFieldMapper.selectFields(
                 config.getInlongGroupId(),
                 config.getInlongStreamId());
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
index 68f24c4..268dc78 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
@@ -619,12 +619,12 @@ public class InlongStreamServiceImpl implements InlongStreamService {
      * <p/>First physically delete the existing field information, and then add the field information of this batch
      */
     @Transactional(rollbackFor = Throwable.class)
-    void updateField(String groupId, String streamId, List<InlongStreamFieldInfo> fieldInfoList) {
+    void updateField(String groupId, String streamId, List<InlongStreamFieldInfo> fieldList) {
         LOGGER.debug("begin to update inlong stream field, groupId={}, streamId={}, field={}", groupId, streamId,
-                fieldInfoList);
+                fieldList);
         try {
             streamFieldMapper.deleteAllByIdentifier(groupId, streamId);
-            saveField(groupId, streamId, fieldInfoList);
+            saveField(groupId, streamId, fieldList);
             LOGGER.info("success to update inlong stream field for groupId={}", groupId);
         } catch (Exception e) {
             LOGGER.error("failed to update inlong stream field: ", e);
@@ -637,13 +637,14 @@ public class InlongStreamServiceImpl implements InlongStreamService {
         if (CollectionUtils.isEmpty(infoList)) {
             return;
         }
-        List<InlongStreamFieldEntity> entities = CommonBeanUtils.copyListProperties(infoList,
+        List<InlongStreamFieldEntity> list = CommonBeanUtils.copyListProperties(infoList,
                 InlongStreamFieldEntity::new);
-        for (InlongStreamFieldEntity entity : entities) {
+        for (InlongStreamFieldEntity entity : list) {
             entity.setInlongGroupId(groupId);
             entity.setInlongStreamId(streamId);
+            entity.setIsDeleted(Constant.UN_DELETED);
         }
-        streamFieldMapper.insertAll(entities);
+        streamFieldMapper.insertAll(list);
     }
 
     /**
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/FieldInfoUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/FieldInfoUtils.java
index bb386f8..a14276e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/FieldInfoUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/FieldInfoUtils.java
@@ -17,10 +17,10 @@
 
 package org.apache.inlong.manager.service.thirdparty.sort.util;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.FieldType;
 import org.apache.inlong.manager.common.enums.MetaFieldType;
 import org.apache.inlong.manager.common.pojo.sink.SinkFieldResponse;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
 import org.apache.inlong.sort.formats.common.ArrayFormatInfo;
 import org.apache.inlong.sort.formats.common.BooleanFormatInfo;
 import org.apache.inlong.sort.formats.common.ByteFormatInfo;
@@ -39,7 +39,6 @@ import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
 import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
 import org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField;
 import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.transformation.FieldMappingRule;
 import org.apache.inlong.sort.protocol.transformation.FieldMappingRule.FieldMappingUnit;
 
 import java.util.ArrayList;
@@ -68,51 +67,32 @@ public class FieldInfoUtils {
 
     /**
      * Get field info list.
-     * TODO 1. Support partition field, 2. Add is_metadata field in StreamSinkFieldEntity
+     * TODO 1. Support partition field(not need to add index at 0), 2. Add is_metadata field in StreamSinkFieldEntity
      */
-    public static FieldMappingRule createFieldInfo(boolean isAllMigration, List<SinkFieldResponse> fieldList,
-            List<FieldInfo> sourceFields, List<FieldInfo> sinkFields, String partitionField) {
+    public static List<FieldMappingUnit> createFieldInfo(
+            List<InlongStreamFieldInfo> streamFieldList, List<SinkFieldResponse> fieldList,
+            List<FieldInfo> sourceFields, List<FieldInfo> sinkFields) {
+
+        // Set source field info list.
+        for (InlongStreamFieldInfo field : streamFieldList) {
+            FieldInfo sourceField = getFieldInfo(field.getFieldName(), field.getFieldType(),
+                    field.getIsMetaField() == 1);
+            sourceFields.add(sourceField);
+        }
 
-        List<FieldMappingUnit> fieldMappingUnitList = new ArrayList<>();
-        if (isAllMigration) {
-            setAllMigrationBuiltInField(sourceFields, sinkFields, fieldMappingUnitList);
-        } else {
-            boolean duplicate = false;
-            for (SinkFieldResponse field : fieldList) {
-                // If the field name equals to build-in field, new a build-in field info
-                FieldInfo sourceFieldInfo = getFieldInfo(field.getSourceFieldName(),
-                        field.getSourceFieldType(), field.getIsSourceMetaField() == 1);
-                sourceFields.add(sourceFieldInfo);
-
-                // Get sink field info
-                String sinkFieldName = field.getFieldName();
-                if (sinkFieldName.equals(partitionField)) {
-                    duplicate = true;
-                }
-                FieldInfo sinkFieldInfo = getSinkFieldInfo(field.getFieldName(), field.getFieldType(),
-                        field.getSourceFieldName(), field.getIsSourceMetaField() == 1);
-                sinkFields.add(sinkFieldInfo);
-
-                fieldMappingUnitList.add(new FieldMappingUnit(sourceFieldInfo, sinkFieldInfo));
-            }
+        List<FieldMappingUnit> mappingUnitList = new ArrayList<>();
+        // Get sink field info list, if the field name equals to build-in field, new a build-in field info
+        for (SinkFieldResponse field : fieldList) {
+            FieldInfo sinkField = getFieldInfo(field.getFieldName(), field.getFieldType(),
+                    field.getIsMetaField() == 1);
+            sinkFields.add(sinkField);
 
-            // If no partition field in the ordinary fields, add the partition field to the first position
-            if (!duplicate && StringUtils.isNotEmpty(partitionField)) {
-                FieldInfo fieldInfo = new FieldInfo(partitionField, new TimestampFormatInfo("MILLIS"));
-                sourceFields.add(0, fieldInfo);
-            }
+            FieldInfo sourceField = getFieldInfo(field.getSourceFieldName(),
+                    field.getSourceFieldType(), field.getIsMetaField() == 1);
+            mappingUnitList.add(new FieldMappingUnit(sourceField, sinkField));
         }
 
-        return new FieldMappingRule(fieldMappingUnitList.toArray(new FieldMappingUnit[0]));
-    }
-
-    /**
-     * Get field info by the given field name ant type.
-     *
-     * @apiNote If the field name equals to build-in field, new a build-in field info
-     */
-    private static FieldInfo getFieldInfo(String fieldName, String fieldType) {
-        return getFieldInfo(fieldName, fieldType, false);
+        return mappingUnitList;
     }
 
     /**
@@ -133,33 +113,16 @@ public class FieldInfoUtils {
     }
 
     /**
-     * Get field info by the given field name ant type.
-     *
-     * @apiNote If the field name equals to build-in field, new a build-in field info
+     * Get all migration field mapping unit list for binlog source.
      */
-    private static FieldInfo getSinkFieldInfo(String fieldName, String fieldType,
-            String sourceFieldName, boolean isBuiltin) {
-        FieldInfo fieldInfo;
-        BuiltInField builtInField = BUILT_IN_FIELD_MAP.get(sourceFieldName);
-        FormatInfo formatInfo = convertFieldFormat(fieldType.toLowerCase());
-        if (isBuiltin && builtInField != null) {
-            fieldInfo = new BuiltInFieldInfo(fieldName, formatInfo, builtInField);
-        } else {
-            fieldInfo = new FieldInfo(fieldName, formatInfo);
-        }
-        return fieldInfo;
-    }
-
-    /**
-     * Get all migration built-in field for binlog source.
-     */
-    public static void setAllMigrationBuiltInField(List<FieldInfo> sourceFields, List<FieldInfo> sinkFields,
-            List<FieldMappingUnit> fieldMappingUnitList) {
-        BuiltInFieldInfo dataField = new BuiltInFieldInfo("data",
-                StringFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_DATA);
+    public static List<FieldMappingUnit> setAllMigrationFieldMapping(List<FieldInfo> sourceFields,
+            List<FieldInfo> sinkFields) {
+        List<FieldMappingUnit> mappingUnitList = new ArrayList<>();
+        BuiltInFieldInfo dataField = new BuiltInFieldInfo("data", StringFormatInfo.INSTANCE,
+                BuiltInField.MYSQL_METADATA_DATA);
         sourceFields.add(dataField);
         sinkFields.add(dataField);
-        fieldMappingUnitList.add(new FieldMappingUnit(dataField, dataField));
+        mappingUnitList.add(new FieldMappingUnit(dataField, dataField));
 
         for (Map.Entry<String, BuiltInField> entry : BUILT_IN_FIELD_MAP.entrySet()) {
             if (entry.getKey().equals("data_time")) {
@@ -169,8 +132,10 @@ public class FieldInfoUtils {
                     StringFormatInfo.INSTANCE, entry.getValue());
             sourceFields.add(fieldInfo);
             sinkFields.add(fieldInfo);
-            fieldMappingUnitList.add(new FieldMappingUnit(fieldInfo, fieldInfo));
+            mappingUnitList.add(new FieldMappingUnit(fieldInfo, fieldInfo));
         }
+
+        return mappingUnitList;
     }
 
     /**
diff --git a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
index 568a1d0..9416ad0 100644
--- a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
@@ -404,6 +404,7 @@ CREATE TABLE `inlong_stream_field`
     `pre_expression`      varchar(256) DEFAULT NULL COMMENT 'Pre-defined field value expression',
     `field_type`          varchar(20)  NOT NULL COMMENT 'field type',
     `field_comment`       varchar(50)  DEFAULT NULL COMMENT 'Field description',
+    `is_meta_field`       smallint(3)  DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
     `rank_num`            smallint(6)  DEFAULT '0' COMMENT 'Field order (front-end display field order)',
     `is_deleted`          int(11)      DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
     PRIMARY KEY (`id`),
@@ -637,19 +638,19 @@ CREATE TABLE `stream_sink_ext`
 DROP TABLE IF EXISTS `stream_sink_field`;
 CREATE TABLE `stream_sink_field`
 (
-    `id`                   int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
-    `inlong_group_id`      varchar(256) NOT NULL COMMENT 'Inlong group id',
-    `inlong_stream_id`     varchar(256) NOT NULL COMMENT 'Inlong stream id',
-    `sink_id`              int(11)      NOT NULL COMMENT 'Sink id',
-    `sink_type`            varchar(15)  NOT NULL COMMENT 'Sink type',
-    `source_field_name`    varchar(50)   DEFAULT NULL COMMENT 'Source field name',
-    `source_field_type`    varchar(50)   DEFAULT NULL COMMENT 'Source field type',
-    `is_source_meta_field` smallint(3)   DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
-    `field_name`           varchar(50)  NOT NULL COMMENT 'Field name',
-    `field_type`           varchar(50)  NOT NULL COMMENT 'Field type',
-    `field_comment`        varchar(2000) DEFAULT NULL COMMENT 'Field description',
-    `rank_num`             smallint(6)   DEFAULT '0' COMMENT 'Field order (front-end display field order)',
-    `is_deleted`           int(11)       DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+    `id`                int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+    `inlong_group_id`   varchar(256) NOT NULL COMMENT 'Inlong group id',
+    `inlong_stream_id`  varchar(256) NOT NULL COMMENT 'Inlong stream id',
+    `sink_id`           int(11)      NOT NULL COMMENT 'Sink id',
+    `sink_type`         varchar(15)  NOT NULL COMMENT 'Sink type',
+    `source_field_name` varchar(50)   DEFAULT NULL COMMENT 'Source field name',
+    `source_field_type` varchar(50)   DEFAULT NULL COMMENT 'Source field type',
+    `field_name`        varchar(50)  NOT NULL COMMENT 'Field name',
+    `field_type`        varchar(50)  NOT NULL COMMENT 'Field type',
+    `field_comment`     varchar(2000) DEFAULT NULL COMMENT 'Field description',
+    `is_meta_field`     smallint(3)   DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
+    `rank_num`          smallint(6)   DEFAULT '0' COMMENT 'Field order (front-end display field order)',
+    `is_deleted`        int(11)       DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
     PRIMARY KEY (`id`)
 );
 
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 2c54dc0..7b6b41d 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -426,6 +426,7 @@ CREATE TABLE `inlong_stream_field`
     `pre_expression`      varchar(256) DEFAULT NULL COMMENT 'Pre-defined field value expression',
     `field_type`          varchar(20)  NOT NULL COMMENT 'field type',
     `field_comment`       varchar(50)  DEFAULT NULL COMMENT 'Field description',
+    `is_meta_field`       smallint(3)  DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
     `rank_num`            smallint(6)  DEFAULT '0' COMMENT 'Field order (front-end display field order)',
     `is_deleted`          int(11)      DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
     PRIMARY KEY (`id`),
@@ -669,19 +670,19 @@ CREATE TABLE `stream_sink_ext`
 DROP TABLE IF EXISTS `stream_sink_field`;
 CREATE TABLE `stream_sink_field`
 (
-    `id`                   int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
-    `inlong_group_id`      varchar(256) NOT NULL COMMENT 'Inlong group id',
-    `inlong_stream_id`     varchar(256) NOT NULL COMMENT 'Inlong stream id',
-    `sink_id`              int(11)      NOT NULL COMMENT 'Sink id',
-    `sink_type`            varchar(15)  NOT NULL COMMENT 'Sink type',
-    `source_field_name`    varchar(50)   DEFAULT NULL COMMENT 'Source field name',
-    `source_field_type`    varchar(50)   DEFAULT NULL COMMENT 'Source field type',
-    `is_source_meta_field` smallint(3)   DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
-    `field_name`           varchar(50)  NOT NULL COMMENT 'Field name',
-    `field_type`           varchar(50)  NOT NULL COMMENT 'Field type',
-    `field_comment`        varchar(2000) DEFAULT NULL COMMENT 'Field description',
-    `rank_num`             smallint(6)   DEFAULT '0' COMMENT 'Field order (front-end display field order)',
-    `is_deleted`           int(11)       DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+    `id`                int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+    `inlong_group_id`   varchar(256) NOT NULL COMMENT 'Inlong group id',
+    `inlong_stream_id`  varchar(256) NOT NULL COMMENT 'Inlong stream id',
+    `sink_id`           int(11)      NOT NULL COMMENT 'Sink id',
+    `sink_type`         varchar(15)  NOT NULL COMMENT 'Sink type',
+    `source_field_name` varchar(50)   DEFAULT NULL COMMENT 'Source field name',
+    `source_field_type` varchar(50)   DEFAULT NULL COMMENT 'Source field type',
+    `field_name`        varchar(50)  NOT NULL COMMENT 'Field name',
+    `field_type`        varchar(50)  NOT NULL COMMENT 'Field type',
+    `field_comment`     varchar(2000) DEFAULT NULL COMMENT 'Field description',
+    `is_meta_field`     smallint(3)   DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
+    `rank_num`          smallint(6)   DEFAULT '0' COMMENT 'Field order (front-end display field order)',
+    `is_deleted`        int(11)       DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
     PRIMARY KEY (`id`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Stream sink field table';