You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/05/28 08:59:35 UTC

[incubator-inlong] branch master updated: [INLONG-4384][Manager] Store the specific field params of the Iceberg to extParams (#4386)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b59976ed1 [INLONG-4384][Manager] Store the specific field params of the Iceberg to extParams (#4386)
b59976ed1 is described below

commit b59976ed19dfb3cf6599b51192557ae6ac934b4b
Author: woofyzhao <49...@qq.com>
AuthorDate: Sat May 28 16:59:29 2022 +0800

    [INLONG-4384][Manager] Store the specific field params of the Iceberg to extParams (#4386)
---
 .../manager/common/enums/IcebergPartition.java     |  5 +-
 .../manager/common/pojo/sink/SinkFieldBase.java    |  2 +-
 .../pojo/sink/iceberg/IcebergColumnInfo.java       | 53 ++++++++++++++++++++--
 .../manager/dao/entity/StreamSinkFieldEntity.java  |  9 +---
 .../mappers/StreamSinkFieldEntityMapper.xml        | 30 ++++--------
 .../resource/es/ElasticsearchResourceOperator.java | 10 ++--
 .../resource/hbase/HbaseResourceOperator.java      |  2 +-
 .../resource/iceberg/IcebergCatalogUtils.java      |  4 ++
 .../resource/iceberg/IcebergResourceOperator.java  |  8 +---
 .../main/resources/sql/apache_inlong_manager.sql   | 34 ++++++--------
 .../manager-web/sql/apache_inlong_manager.sql      | 34 ++++++--------
 11 files changed, 102 insertions(+), 89 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/IcebergPartition.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/IcebergPartition.java
index 857cc81dd..ae862b5e8 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/IcebergPartition.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/IcebergPartition.java
@@ -19,8 +19,6 @@ package org.apache.inlong.manager.common.enums;
 
 import org.apache.inlong.manager.common.util.Preconditions;
 
-import java.util.Locale;
-
 /**
  * Iceberg partition type
  */
@@ -32,6 +30,7 @@ public enum IcebergPartition {
     MONTH,
     DAY,
     HOUR,
+    NONE,
     ;
 
     /**
@@ -40,7 +39,7 @@ public enum IcebergPartition {
     public static IcebergPartition forName(String name) {
         Preconditions.checkNotNull(name, "IcebergPartition should not be null");
         for (IcebergPartition value : values()) {
-            if (value.toString().equals(name) || value.toString().equals(name.toUpperCase(Locale.ROOT))) {
+            if (value.toString().equalsIgnoreCase(name)) {
                 return value;
             }
         }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldBase.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldBase.java
index 3f19f2dff..555109f84 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldBase.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldBase.java
@@ -56,7 +56,7 @@ public class SinkFieldBase {
     private String partitionStrategy;
 
     @ApiModelProperty("Extra Param in JSON style")
-    private String extrParam;
+    private String extParams;
 
     @ApiModelProperty("Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601"
             + " and custom such as 'yyyy-MM-dd HH:mm:ss'. This is mainly used for time format")
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergColumnInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergColumnInfo.java
index ef2573086..b7e6a1826 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergColumnInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergColumnInfo.java
@@ -17,22 +17,65 @@
 
 package org.apache.inlong.manager.common.pojo.sink.iceberg;
 
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
 import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+
 
 /**
  * Iceberg column info
  */
 @Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
 public class IcebergColumnInfo {
 
-    private String name;
-    private String type;
-    private String desc;
-    private boolean required;
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    @ApiModelProperty("Length of fixed type")
     private Integer length;
-    private String partitionStrategy;
+
+    @ApiModelProperty("Precision of decimal type")
     private Integer precision;
+
+    @ApiModelProperty("Scale of decimal type")
     private Integer scale;
+
+    @ApiModelProperty("Field partition strategy, including: None, Identity, Year, Month, Day, Hour, Bucket, Truncate")
+    private String partitionStrategy;
+
+    @ApiModelProperty("Bucket num param of bucket partition")
     private Integer bucketNum;
+
+    @ApiModelProperty("Width param of truncate partition")
     private Integer width;
+
+    // The following are passed from base field and need not be part of API for extra param
+    private String name;
+    private String type;
+    private String desc;
+    private boolean required;
+
+    /**
+     * Get the extra param from the Json
+     */
+    public static IcebergColumnInfo getFromJson(String extParams) {
+        if (StringUtils.isEmpty(extParams)) {
+            return new IcebergColumnInfo();
+        }
+        try {
+            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+            return OBJECT_MAPPER.readValue(extParams, IcebergColumnInfo.class);
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
+        }
+    }
 }
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java
index ca79cbc89..1482c4f07 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java
@@ -40,14 +40,7 @@ public class StreamSinkFieldEntity implements Serializable {
     private Integer isRequired;
     private String sourceFieldName;
     private String sourceFieldType;
-
-    private Integer fieldLength;
-    private Integer fieldPrecision;
-    private Integer fieldScale;
-    private String partitionStrategy;
-    private Integer bucketNum;
-    private Integer width;
-    private String extrParam;
+    private String extParams;
 
     private Integer isMetaField;
     private String fieldFormat;
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
index 93daa37df..5ed3a362f 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
@@ -31,11 +31,7 @@
         <result column="field_comment" jdbcType="VARCHAR" property="fieldComment"/>
         <result column="source_field_name" jdbcType="VARCHAR" property="sourceFieldName"/>
         <result column="source_field_type" jdbcType="VARCHAR" property="sourceFieldType"/>
-        <result column="field_length" jdbcType="INTEGER" property="fieldLength"/>
-        <result column="field_precision" jdbcType="INTEGER" property="fieldPrecision"/>
-        <result column="field_scale" jdbcType="INTEGER" property="fieldScale"/>
-        <result column="partition_strategy" jdbcType="VARCHAR" property="partitionStrategy"/>
-        <result column="extr_param" jdbcType="LONGVARCHAR" property="extrParam"/>
+        <result column="ext_params" jdbcType="LONGVARCHAR" property="extParams"/>
         <result column="is_meta_field" jdbcType="SMALLINT" property="isMetaField"/>
         <result column="field_format" jdbcType="VARCHAR" property="fieldFormat"/>
         <result column="rank_num" jdbcType="SMALLINT" property="rankNum"/>
@@ -43,8 +39,7 @@
     </resultMap>
     <sql id="Base_Column_List">
         id, sink_id, field_name, field_type, field_comment, source_field_name, source_field_type,
-        field_length, field_precision, field_scale, partition_strategy, extr_param, is_meta_field, field_format,
-        rank_num, is_deleted
+        ext_params, is_meta_field, field_format, rank_num, is_deleted
     </sql>
 
     <insert id="insert" useGeneratedKeys="true" keyProperty="id"
@@ -54,19 +49,16 @@
                                        sink_type, field_name,
                                        field_type, field_comment,
                                        source_field_name, source_field_type,
-                                       field_length, field_precision,
-                                       field_scale, partition_strategy, extr_param,
-                                       is_meta_field, field_format,
+                                       ext_params, is_meta_field, field_format,
                                        rank_num, is_deleted)
         values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR},
                 #{inlongStreamId,jdbcType=VARCHAR}, #{sinkId,jdbcType=INTEGER},
                 #{sinkType,jdbcType=VARCHAR}, #{fieldName,jdbcType=VARCHAR},
                 #{fieldType,jdbcType=VARCHAR}, #{fieldComment,jdbcType=VARCHAR},
                 #{sourceFieldName,jdbcType=VARCHAR}, #{sourceFieldType,jdbcType=VARCHAR},
-                #{fieldLength,jdbcType=INTEGER}, #{fieldPrecision,jdbcType=INTEGER},
-                #{fieldScale,jdbcType=INTEGER}, #{partitionStrategy,jdbcType=VARCHAR}, #{extrParam,jdbcType=LONGVARCHAR},
-                #{isMetaField,jdbcType=SMALLINT}, #{fieldFormat,jdbcType=VARCHAR},
-                #{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER})
+                #{extParams,jdbcType=LONGVARCHAR}, #{isMetaField,jdbcType=SMALLINT},
+                #{fieldFormat,jdbcType=VARCHAR}, #{rankNum,jdbcType=SMALLINT},
+                #{isDeleted,jdbcType=INTEGER})
     </insert>
     <insert id="insertAll">
         insert into stream_sink_field (
@@ -75,9 +67,7 @@
         sink_type, field_name,
         field_type, field_comment,
         source_field_name, source_field_type,
-        field_length, field_precision,
-        field_scale, partition_strategy, extr_param,
-        is_meta_field, field_format,
+        ext_params, is_meta_field, field_format,
         rank_num, is_deleted
         )
         values
@@ -88,9 +78,7 @@
             #{item.sinkType,jdbcType=VARCHAR}, #{item.fieldName,jdbcType=VARCHAR},
             #{item.fieldType,jdbcType=VARCHAR}, #{item.fieldComment,jdbcType=VARCHAR},
             #{item.sourceFieldName,jdbcType=VARCHAR}, #{item.sourceFieldType,jdbcType=VARCHAR},
-            #{item.fieldLength,jdbcType=INTEGER}, #{item.fieldPrecision,jdbcType=INTEGER},
-            #{item.fieldScale,jdbcType=INTEGER}, #{item.partitionStrategy,jdbcType=VARCHAR},
-            #{item.extrParam,jdbcType=LONGVARCHAR},
+            #{item.extParams,jdbcType=LONGVARCHAR},
             #{item.isMetaField,jdbcType=SMALLINT}, #{item.fieldFormat,jdbcType=VARCHAR},
             #{item.rankNum,jdbcType=SMALLINT}, #{item.isDeleted,jdbcType=INTEGER}
             )
@@ -134,4 +122,4 @@
         from stream_sink_field
         where sink_id = #{sinkId,jdbcType=INTEGER}
     </delete>
-</mapper>
\ No newline at end of file
+</mapper>
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/es/ElasticsearchResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/es/ElasticsearchResourceOperator.java
index 150f638a2..f5cc4bf0f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/es/ElasticsearchResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/es/ElasticsearchResourceOperator.java
@@ -131,11 +131,11 @@ public class ElasticsearchResourceOperator implements SinkResourceOperator {
             fieldInfo.setName(entry.getFieldName());
             fieldInfo.setType(entry.getFieldType());
             fieldInfo.setFormat(entry.getFieldFormat());
-            ElasticsearchFieldInfo filedExtrParam =
-                    ElasticsearchFieldInfo.getFromJson(entry.getExtrParam());
-            fieldInfo.setScalingFactor(filedExtrParam.getScalingFactor());
-            fieldInfo.setAnalyzer(filedExtrParam.getAnalyzer());
-            fieldInfo.setSearchAnalyzer(filedExtrParam.getSearchAnalyzer());
+            ElasticsearchFieldInfo fieldExtParams =
+                    ElasticsearchFieldInfo.getFromJson(entry.getExtParams());
+            fieldInfo.setScalingFactor(fieldExtParams.getScalingFactor());
+            fieldInfo.setAnalyzer(fieldExtParams.getAnalyzer());
+            fieldInfo.setSearchAnalyzer(fieldExtParams.getSearchAnalyzer());
             fieldList.add(fieldInfo);
         }
         return fieldList;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/hbase/HbaseResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/hbase/HbaseResourceOperator.java
index 6264459d4..1be7412a8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/hbase/HbaseResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/hbase/HbaseResourceOperator.java
@@ -139,7 +139,7 @@ public class HbaseResourceOperator implements SinkResourceOperator {
 
         List<HbaseColumnFamilyInfo> columnFamilies = new ArrayList<>();
         for (StreamSinkFieldEntity field : fieldList) {
-            HbaseColumnFamilyInfo columnFamily = HbaseColumnFamilyInfo.getFromJson(field.getExtrParam());
+            HbaseColumnFamilyInfo columnFamily = HbaseColumnFamilyInfo.getFromJson(field.getExtParams());
             if (seen.contains(columnFamily.getCfName())) {
                 continue;
             }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/iceberg/IcebergCatalogUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/iceberg/IcebergCatalogUtils.java
index 1d44c09b3..7543c94eb 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/iceberg/IcebergCatalogUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/iceberg/IcebergCatalogUtils.java
@@ -218,6 +218,8 @@ public class IcebergCatalogUtils {
             case HOUR:
                 builder.hour(column.getName());
                 break;
+            case NONE:
+                break;
             default:
                 throw new IllegalArgumentException(
                         "unknown iceberg partition strategy: " + column.getPartitionStrategy());
@@ -255,6 +257,8 @@ public class IcebergCatalogUtils {
             case HOUR:
                 builder.addField(Expressions.hour(column.getName()));
                 break;
+            case NONE:
+                break;
             default:
                 throw new IllegalArgumentException(
                         "unknown iceberg partition strategy: " + column.getPartitionStrategy());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/iceberg/IcebergResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/iceberg/IcebergResourceOperator.java
index 20b5fab13..7f3e1e4bb 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/iceberg/IcebergResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/iceberg/IcebergResourceOperator.java
@@ -131,17 +131,11 @@ public class IcebergResourceOperator implements SinkResourceOperator {
         // set columns
         List<IcebergColumnInfo> columnList = new ArrayList<>();
         for (StreamSinkFieldEntity field : fieldList) {
-            IcebergColumnInfo column = new IcebergColumnInfo();
+            IcebergColumnInfo column = IcebergColumnInfo.getFromJson(field.getExtParams());
             column.setName(field.getFieldName());
             column.setType(field.getFieldType());
             column.setDesc(field.getFieldComment());
             column.setRequired(field.getIsRequired() != null && field.getIsRequired() > 0);
-            column.setPartitionStrategy(field.getPartitionStrategy());
-            column.setLength(field.getFieldLength());
-            column.setPrecision(field.getFieldPrecision());
-            column.setScale(field.getFieldScale());
-            column.setBucketNum(field.getBucketNum());
-            column.setWidth(field.getWidth());
             columnList.add(column);
         }
 
diff --git a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
index 73e564d25..2174e1b01 100644
--- a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
@@ -634,25 +634,21 @@ CREATE TABLE IF NOT EXISTS `stream_transform_field`
 -- ----------------------------
 CREATE TABLE IF NOT EXISTS `stream_sink_field`
 (
-    `id`                 int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
-    `inlong_group_id`    varchar(256) NOT NULL COMMENT 'Inlong group id',
-    `inlong_stream_id`   varchar(256) NOT NULL COMMENT 'Inlong stream id',
-    `sink_id`            int(11)      NOT NULL COMMENT 'Sink id',
-    `sink_type`          varchar(15)  NOT NULL COMMENT 'Sink type',
-    `source_field_name`  varchar(50)   DEFAULT NULL COMMENT 'Source field name',
-    `source_field_type`  varchar(50)   DEFAULT NULL COMMENT 'Source field type',
-    `field_name`         varchar(50)  NOT NULL COMMENT 'Field name',
-    `field_type`         varchar(50)  NOT NULL COMMENT 'Field type',
-    `field_comment`      varchar(2000) DEFAULT NULL COMMENT 'Field description',
-    `field_length`       int(4)        DEFAULT NULL COMMENT 'Field length',
-    `field_precision`    int(4)        DEFAULT NULL COMMENT 'Field precision',
-    `field_scale`        int(4)        DEFAULT NULL COMMENT 'Field scale',
-    `partition_strategy` varchar(20)   DEFAULT NULL COMMENT 'Field partition strategy',
-    `extr_param`         text COMMENT 'Field extr param',
-    `is_meta_field`      smallint(3)   DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
-    `field_format`       varchar(50)   DEFAULT NULL COMMENT 'Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such as yyyy-MM-dd HH:mm:ss',
-    `rank_num`           smallint(6)   DEFAULT '0' COMMENT 'Field order (front-end display field order)',
-    `is_deleted`         int(11)       DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+    `id`                int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+    `inlong_group_id`   varchar(256) NOT NULL COMMENT 'Inlong group id',
+    `inlong_stream_id`  varchar(256) NOT NULL COMMENT 'Inlong stream id',
+    `sink_id`           int(11)      NOT NULL COMMENT 'Sink id',
+    `sink_type`         varchar(15)  NOT NULL COMMENT 'Sink type',
+    `source_field_name` varchar(50)   DEFAULT NULL COMMENT 'Source field name',
+    `source_field_type` varchar(50)   DEFAULT NULL COMMENT 'Source field type',
+    `field_name`        varchar(50)  NOT NULL COMMENT 'Field name',
+    `field_type`        varchar(50)  NOT NULL COMMENT 'Field type',
+    `field_comment`     varchar(2000) DEFAULT NULL COMMENT 'Field description',
+    `ext_params`        text COMMENT 'Field ext params',
+    `is_meta_field`     smallint(3)   DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
+    `field_format`      varchar(50)   DEFAULT NULL COMMENT 'Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such as yyyy-MM-dd HH:mm:ss',
+    `rank_num`          smallint(6)   DEFAULT '0' COMMENT 'Field order (front-end display field order)',
+    `is_deleted`        int(11)       DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
     PRIMARY KEY (`id`)
 );
 
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 72d352272..b0e06ec02 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -670,25 +670,21 @@ CREATE TABLE IF NOT EXISTS `stream_transform_field`
 -- ----------------------------
 CREATE TABLE IF NOT EXISTS `stream_sink_field`
 (
-    `id`                 int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
-    `inlong_group_id`    varchar(256) NOT NULL COMMENT 'Inlong group id',
-    `inlong_stream_id`   varchar(256) NOT NULL COMMENT 'Inlong stream id',
-    `sink_id`            int(11)      NOT NULL COMMENT 'Sink id',
-    `sink_type`          varchar(15)  NOT NULL COMMENT 'Sink type',
-    `source_field_name`  varchar(50)   DEFAULT NULL COMMENT 'Source field name',
-    `source_field_type`  varchar(50)   DEFAULT NULL COMMENT 'Source field type',
-    `field_name`         varchar(50)  NOT NULL COMMENT 'Field name',
-    `field_type`         varchar(50)  NOT NULL COMMENT 'Field type',
-    `field_comment`      varchar(2000) DEFAULT NULL COMMENT 'Field description',
-    `field_length`       int(4)        DEFAULT NULL COMMENT 'Field length',
-    `field_precision`    int(4)        DEFAULT NULL COMMENT 'Field precision',
-    `field_scale`        int(4)        DEFAULT NULL COMMENT 'Field scale',
-    `partition_strategy` varchar(20)   DEFAULT NULL COMMENT 'Field partition strategy',
-    `extr_param`         text COMMENT 'Field extr param',
-    `is_meta_field`      smallint(3)   DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
-    `field_format`       varchar(50)   DEFAULT NULL COMMENT 'Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such as yyyy-MM-dd HH:mm:ss',
-    `rank_num`           smallint(6)   DEFAULT '0' COMMENT 'Field order (front-end display field order)',
-    `is_deleted`         int(11)       DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+    `id`                int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+    `inlong_group_id`   varchar(256) NOT NULL COMMENT 'Inlong group id',
+    `inlong_stream_id`  varchar(256) NOT NULL COMMENT 'Inlong stream id',
+    `sink_id`           int(11)      NOT NULL COMMENT 'Sink id',
+    `sink_type`         varchar(15)  NOT NULL COMMENT 'Sink type',
+    `source_field_name` varchar(50)   DEFAULT NULL COMMENT 'Source field name',
+    `source_field_type` varchar(50)   DEFAULT NULL COMMENT 'Source field type',
+    `field_name`        varchar(50)  NOT NULL COMMENT 'Field name',
+    `field_type`        varchar(50)  NOT NULL COMMENT 'Field type',
+    `field_comment`     varchar(2000) DEFAULT NULL COMMENT 'Field description',
+    `ext_params`        text COMMENT 'Field ext params',
+    `is_meta_field`     smallint(3)   DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
+    `field_format`      varchar(50)   DEFAULT NULL COMMENT 'Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such as yyyy-MM-dd HH:mm:ss',
+    `rank_num`          smallint(6)   DEFAULT '0' COMMENT 'Field order (front-end display field order)',
+    `is_deleted`        int(11)       DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
     PRIMARY KEY (`id`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Stream sink field table';