You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/05/28 08:59:35 UTC
[incubator-inlong] branch master updated: [INLONG-4384][Manager] Store the specific field params of the Iceberg to extParams (#4386)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b59976ed1 [INLONG-4384][Manager] Store the specific field params of the Iceberg to extParams (#4386)
b59976ed1 is described below
commit b59976ed19dfb3cf6599b51192557ae6ac934b4b
Author: woofyzhao <49...@qq.com>
AuthorDate: Sat May 28 16:59:29 2022 +0800
[INLONG-4384][Manager] Store the specific field params of the Iceberg to extParams (#4386)
---
.../manager/common/enums/IcebergPartition.java | 5 +-
.../manager/common/pojo/sink/SinkFieldBase.java | 2 +-
.../pojo/sink/iceberg/IcebergColumnInfo.java | 53 ++++++++++++++++++++--
.../manager/dao/entity/StreamSinkFieldEntity.java | 9 +---
.../mappers/StreamSinkFieldEntityMapper.xml | 30 ++++--------
.../resource/es/ElasticsearchResourceOperator.java | 10 ++--
.../resource/hbase/HbaseResourceOperator.java | 2 +-
.../resource/iceberg/IcebergCatalogUtils.java | 4 ++
.../resource/iceberg/IcebergResourceOperator.java | 8 +---
.../main/resources/sql/apache_inlong_manager.sql | 34 ++++++--------
.../manager-web/sql/apache_inlong_manager.sql | 34 ++++++--------
11 files changed, 102 insertions(+), 89 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/IcebergPartition.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/IcebergPartition.java
index 857cc81dd..ae862b5e8 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/IcebergPartition.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/IcebergPartition.java
@@ -19,8 +19,6 @@ package org.apache.inlong.manager.common.enums;
import org.apache.inlong.manager.common.util.Preconditions;
-import java.util.Locale;
-
/**
* Iceberg partition type
*/
@@ -32,6 +30,7 @@ public enum IcebergPartition {
MONTH,
DAY,
HOUR,
+ NONE,
;
/**
@@ -40,7 +39,7 @@ public enum IcebergPartition {
public static IcebergPartition forName(String name) {
Preconditions.checkNotNull(name, "IcebergPartition should not be null");
for (IcebergPartition value : values()) {
- if (value.toString().equals(name) || value.toString().equals(name.toUpperCase(Locale.ROOT))) {
+ if (value.toString().equalsIgnoreCase(name)) {
return value;
}
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldBase.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldBase.java
index 3f19f2dff..555109f84 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldBase.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldBase.java
@@ -56,7 +56,7 @@ public class SinkFieldBase {
private String partitionStrategy;
@ApiModelProperty("Extra Param in JSON style")
- private String extrParam;
+ private String extParams;
@ApiModelProperty("Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601"
+ " and custom such as 'yyyy-MM-dd HH:mm:ss'. This is mainly used for time format")
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergColumnInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergColumnInfo.java
index ef2573086..b7e6a1826 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergColumnInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergColumnInfo.java
@@ -17,22 +17,65 @@
package org.apache.inlong.manager.common.pojo.sink.iceberg;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+
/**
* Iceberg column info
*/
@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
public class IcebergColumnInfo {
- private String name;
- private String type;
- private String desc;
- private boolean required;
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ @ApiModelProperty("Length of fixed type")
private Integer length;
- private String partitionStrategy;
+
+ @ApiModelProperty("Precision of decimal type")
private Integer precision;
+
+ @ApiModelProperty("Scale of decimal type")
private Integer scale;
+
+ @ApiModelProperty("Field partition strategy, including: None, Identity, Year, Month, Day, Hour, Bucket, Truncate")
+ private String partitionStrategy;
+
+ @ApiModelProperty("Bucket num param of bucket partition")
private Integer bucketNum;
+
+ @ApiModelProperty("Width param of truncate partition")
private Integer width;
+
+ // The following are passed from base field and need not be part of API for extra param
+ private String name;
+ private String type;
+ private String desc;
+ private boolean required;
+
+ /**
+ * Get the extra param from the Json
+ */
+ public static IcebergColumnInfo getFromJson(String extParams) {
+ if (StringUtils.isEmpty(extParams)) {
+ return new IcebergColumnInfo();
+ }
+ try {
+ OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ return OBJECT_MAPPER.readValue(extParams, IcebergColumnInfo.class);
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
+ }
+ }
}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java
index ca79cbc89..1482c4f07 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java
@@ -40,14 +40,7 @@ public class StreamSinkFieldEntity implements Serializable {
private Integer isRequired;
private String sourceFieldName;
private String sourceFieldType;
-
- private Integer fieldLength;
- private Integer fieldPrecision;
- private Integer fieldScale;
- private String partitionStrategy;
- private Integer bucketNum;
- private Integer width;
- private String extrParam;
+ private String extParams;
private Integer isMetaField;
private String fieldFormat;
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
index 93daa37df..5ed3a362f 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
@@ -31,11 +31,7 @@
<result column="field_comment" jdbcType="VARCHAR" property="fieldComment"/>
<result column="source_field_name" jdbcType="VARCHAR" property="sourceFieldName"/>
<result column="source_field_type" jdbcType="VARCHAR" property="sourceFieldType"/>
- <result column="field_length" jdbcType="INTEGER" property="fieldLength"/>
- <result column="field_precision" jdbcType="INTEGER" property="fieldPrecision"/>
- <result column="field_scale" jdbcType="INTEGER" property="fieldScale"/>
- <result column="partition_strategy" jdbcType="VARCHAR" property="partitionStrategy"/>
- <result column="extr_param" jdbcType="LONGVARCHAR" property="extrParam"/>
+ <result column="ext_params" jdbcType="LONGVARCHAR" property="extParams"/>
<result column="is_meta_field" jdbcType="SMALLINT" property="isMetaField"/>
<result column="field_format" jdbcType="VARCHAR" property="fieldFormat"/>
<result column="rank_num" jdbcType="SMALLINT" property="rankNum"/>
@@ -43,8 +39,7 @@
</resultMap>
<sql id="Base_Column_List">
id, sink_id, field_name, field_type, field_comment, source_field_name, source_field_type,
- field_length, field_precision, field_scale, partition_strategy, extr_param, is_meta_field, field_format,
- rank_num, is_deleted
+ ext_params, is_meta_field, field_format, rank_num, is_deleted
</sql>
<insert id="insert" useGeneratedKeys="true" keyProperty="id"
@@ -54,19 +49,16 @@
sink_type, field_name,
field_type, field_comment,
source_field_name, source_field_type,
- field_length, field_precision,
- field_scale, partition_strategy, extr_param,
- is_meta_field, field_format,
+ ext_params, is_meta_field, field_format,
rank_num, is_deleted)
values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR},
#{inlongStreamId,jdbcType=VARCHAR}, #{sinkId,jdbcType=INTEGER},
#{sinkType,jdbcType=VARCHAR}, #{fieldName,jdbcType=VARCHAR},
#{fieldType,jdbcType=VARCHAR}, #{fieldComment,jdbcType=VARCHAR},
#{sourceFieldName,jdbcType=VARCHAR}, #{sourceFieldType,jdbcType=VARCHAR},
- #{fieldLength,jdbcType=INTEGER}, #{fieldPrecision,jdbcType=INTEGER},
- #{fieldScale,jdbcType=INTEGER}, #{partitionStrategy,jdbcType=VARCHAR}, #{extrParam,jdbcType=LONGVARCHAR},
- #{isMetaField,jdbcType=SMALLINT}, #{fieldFormat,jdbcType=VARCHAR},
- #{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER})
+ #{extParams,jdbcType=LONGVARCHAR}, #{isMetaField,jdbcType=SMALLINT},
+ #{fieldFormat,jdbcType=VARCHAR}, #{rankNum,jdbcType=SMALLINT},
+ #{isDeleted,jdbcType=INTEGER})
</insert>
<insert id="insertAll">
insert into stream_sink_field (
@@ -75,9 +67,7 @@
sink_type, field_name,
field_type, field_comment,
source_field_name, source_field_type,
- field_length, field_precision,
- field_scale, partition_strategy, extr_param,
- is_meta_field, field_format,
+ ext_params, is_meta_field, field_format,
rank_num, is_deleted
)
values
@@ -88,9 +78,7 @@
#{item.sinkType,jdbcType=VARCHAR}, #{item.fieldName,jdbcType=VARCHAR},
#{item.fieldType,jdbcType=VARCHAR}, #{item.fieldComment,jdbcType=VARCHAR},
#{item.sourceFieldName,jdbcType=VARCHAR}, #{item.sourceFieldType,jdbcType=VARCHAR},
- #{item.fieldLength,jdbcType=INTEGER}, #{item.fieldPrecision,jdbcType=INTEGER},
- #{item.fieldScale,jdbcType=INTEGER}, #{item.partitionStrategy,jdbcType=VARCHAR},
- #{item.extrParam,jdbcType=LONGVARCHAR},
+ #{item.extParams,jdbcType=LONGVARCHAR},
#{item.isMetaField,jdbcType=SMALLINT}, #{item.fieldFormat,jdbcType=VARCHAR},
#{item.rankNum,jdbcType=SMALLINT}, #{item.isDeleted,jdbcType=INTEGER}
)
@@ -134,4 +122,4 @@
from stream_sink_field
where sink_id = #{sinkId,jdbcType=INTEGER}
</delete>
-</mapper>
\ No newline at end of file
+</mapper>
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/es/ElasticsearchResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/es/ElasticsearchResourceOperator.java
index 150f638a2..f5cc4bf0f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/es/ElasticsearchResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/es/ElasticsearchResourceOperator.java
@@ -131,11 +131,11 @@ public class ElasticsearchResourceOperator implements SinkResourceOperator {
fieldInfo.setName(entry.getFieldName());
fieldInfo.setType(entry.getFieldType());
fieldInfo.setFormat(entry.getFieldFormat());
- ElasticsearchFieldInfo filedExtrParam =
- ElasticsearchFieldInfo.getFromJson(entry.getExtrParam());
- fieldInfo.setScalingFactor(filedExtrParam.getScalingFactor());
- fieldInfo.setAnalyzer(filedExtrParam.getAnalyzer());
- fieldInfo.setSearchAnalyzer(filedExtrParam.getSearchAnalyzer());
+ ElasticsearchFieldInfo fieldExtParams =
+ ElasticsearchFieldInfo.getFromJson(entry.getExtParams());
+ fieldInfo.setScalingFactor(fieldExtParams.getScalingFactor());
+ fieldInfo.setAnalyzer(fieldExtParams.getAnalyzer());
+ fieldInfo.setSearchAnalyzer(fieldExtParams.getSearchAnalyzer());
fieldList.add(fieldInfo);
}
return fieldList;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/hbase/HbaseResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/hbase/HbaseResourceOperator.java
index 6264459d4..1be7412a8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/hbase/HbaseResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/hbase/HbaseResourceOperator.java
@@ -139,7 +139,7 @@ public class HbaseResourceOperator implements SinkResourceOperator {
List<HbaseColumnFamilyInfo> columnFamilies = new ArrayList<>();
for (StreamSinkFieldEntity field : fieldList) {
- HbaseColumnFamilyInfo columnFamily = HbaseColumnFamilyInfo.getFromJson(field.getExtrParam());
+ HbaseColumnFamilyInfo columnFamily = HbaseColumnFamilyInfo.getFromJson(field.getExtParams());
if (seen.contains(columnFamily.getCfName())) {
continue;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/iceberg/IcebergCatalogUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/iceberg/IcebergCatalogUtils.java
index 1d44c09b3..7543c94eb 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/iceberg/IcebergCatalogUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/iceberg/IcebergCatalogUtils.java
@@ -218,6 +218,8 @@ public class IcebergCatalogUtils {
case HOUR:
builder.hour(column.getName());
break;
+ case NONE:
+ break;
default:
throw new IllegalArgumentException(
"unknown iceberg partition strategy: " + column.getPartitionStrategy());
@@ -255,6 +257,8 @@ public class IcebergCatalogUtils {
case HOUR:
builder.addField(Expressions.hour(column.getName()));
break;
+ case NONE:
+ break;
default:
throw new IllegalArgumentException(
"unknown iceberg partition strategy: " + column.getPartitionStrategy());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/iceberg/IcebergResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/iceberg/IcebergResourceOperator.java
index 20b5fab13..7f3e1e4bb 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/iceberg/IcebergResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/iceberg/IcebergResourceOperator.java
@@ -131,17 +131,11 @@ public class IcebergResourceOperator implements SinkResourceOperator {
// set columns
List<IcebergColumnInfo> columnList = new ArrayList<>();
for (StreamSinkFieldEntity field : fieldList) {
- IcebergColumnInfo column = new IcebergColumnInfo();
+ IcebergColumnInfo column = IcebergColumnInfo.getFromJson(field.getExtParams());
column.setName(field.getFieldName());
column.setType(field.getFieldType());
column.setDesc(field.getFieldComment());
column.setRequired(field.getIsRequired() != null && field.getIsRequired() > 0);
- column.setPartitionStrategy(field.getPartitionStrategy());
- column.setLength(field.getFieldLength());
- column.setPrecision(field.getFieldPrecision());
- column.setScale(field.getFieldScale());
- column.setBucketNum(field.getBucketNum());
- column.setWidth(field.getWidth());
columnList.add(column);
}
diff --git a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
index 73e564d25..2174e1b01 100644
--- a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
@@ -634,25 +634,21 @@ CREATE TABLE IF NOT EXISTS `stream_transform_field`
-- ----------------------------
CREATE TABLE IF NOT EXISTS `stream_sink_field`
(
- `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
- `inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id',
- `inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id',
- `sink_id` int(11) NOT NULL COMMENT 'Sink id',
- `sink_type` varchar(15) NOT NULL COMMENT 'Sink type',
- `source_field_name` varchar(50) DEFAULT NULL COMMENT 'Source field name',
- `source_field_type` varchar(50) DEFAULT NULL COMMENT 'Source field type',
- `field_name` varchar(50) NOT NULL COMMENT 'Field name',
- `field_type` varchar(50) NOT NULL COMMENT 'Field type',
- `field_comment` varchar(2000) DEFAULT NULL COMMENT 'Field description',
- `field_length` int(4) DEFAULT NULL COMMENT 'Field length',
- `field_precision` int(4) DEFAULT NULL COMMENT 'Field precision',
- `field_scale` int(4) DEFAULT NULL COMMENT 'Field scale',
- `partition_strategy` varchar(20) DEFAULT NULL COMMENT 'Field partition strategy',
- `extr_param` text COMMENT 'Field extr param',
- `is_meta_field` smallint(3) DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
- `field_format` varchar(50) DEFAULT NULL COMMENT 'Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such as yyyy-MM-dd HH:mm:ss',
- `rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order (front-end display field order)',
- `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+ `inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id',
+ `inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id',
+ `sink_id` int(11) NOT NULL COMMENT 'Sink id',
+ `sink_type` varchar(15) NOT NULL COMMENT 'Sink type',
+ `source_field_name` varchar(50) DEFAULT NULL COMMENT 'Source field name',
+ `source_field_type` varchar(50) DEFAULT NULL COMMENT 'Source field type',
+ `field_name` varchar(50) NOT NULL COMMENT 'Field name',
+ `field_type` varchar(50) NOT NULL COMMENT 'Field type',
+ `field_comment` varchar(2000) DEFAULT NULL COMMENT 'Field description',
+ `ext_params` text COMMENT 'Field ext params',
+ `is_meta_field` smallint(3) DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
+ `field_format` varchar(50) DEFAULT NULL COMMENT 'Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such as yyyy-MM-dd HH:mm:ss',
+ `rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order (front-end display field order)',
+ `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
PRIMARY KEY (`id`)
);
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 72d352272..b0e06ec02 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -670,25 +670,21 @@ CREATE TABLE IF NOT EXISTS `stream_transform_field`
-- ----------------------------
CREATE TABLE IF NOT EXISTS `stream_sink_field`
(
- `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
- `inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id',
- `inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id',
- `sink_id` int(11) NOT NULL COMMENT 'Sink id',
- `sink_type` varchar(15) NOT NULL COMMENT 'Sink type',
- `source_field_name` varchar(50) DEFAULT NULL COMMENT 'Source field name',
- `source_field_type` varchar(50) DEFAULT NULL COMMENT 'Source field type',
- `field_name` varchar(50) NOT NULL COMMENT 'Field name',
- `field_type` varchar(50) NOT NULL COMMENT 'Field type',
- `field_comment` varchar(2000) DEFAULT NULL COMMENT 'Field description',
- `field_length` int(4) DEFAULT NULL COMMENT 'Field length',
- `field_precision` int(4) DEFAULT NULL COMMENT 'Field precision',
- `field_scale` int(4) DEFAULT NULL COMMENT 'Field scale',
- `partition_strategy` varchar(20) DEFAULT NULL COMMENT 'Field partition strategy',
- `extr_param` text COMMENT 'Field extr param',
- `is_meta_field` smallint(3) DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
- `field_format` varchar(50) DEFAULT NULL COMMENT 'Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such as yyyy-MM-dd HH:mm:ss',
- `rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order (front-end display field order)',
- `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+ `inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id',
+ `inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id',
+ `sink_id` int(11) NOT NULL COMMENT 'Sink id',
+ `sink_type` varchar(15) NOT NULL COMMENT 'Sink type',
+ `source_field_name` varchar(50) DEFAULT NULL COMMENT 'Source field name',
+ `source_field_type` varchar(50) DEFAULT NULL COMMENT 'Source field type',
+ `field_name` varchar(50) NOT NULL COMMENT 'Field name',
+ `field_type` varchar(50) NOT NULL COMMENT 'Field type',
+ `field_comment` varchar(2000) DEFAULT NULL COMMENT 'Field description',
+ `ext_params` text COMMENT 'Field ext params',
+ `is_meta_field` smallint(3) DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
+ `field_format` varchar(50) DEFAULT NULL COMMENT 'Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such as yyyy-MM-dd HH:mm:ss',
+ `rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order (front-end display field order)',
+ `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
PRIMARY KEY (`id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Stream sink field table';