You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/26 07:56:38 UTC
[incubator-inlong] branch master updated: [INLONG-3367][Manager] Support custom field format (#3375)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new c0046fa [INLONG-3367][Manager] Support custom field format (#3375)
c0046fa is described below
commit c0046fa96c125ca3e84387937b3ef7f94a6f5d41
Author: yunqingmoswu <44...@users.noreply.github.com>
AuthorDate: Sat Mar 26 15:56:31 2022 +0800
[INLONG-3367][Manager] Support custom field format (#3375)
Co-authored-by: yunqingmo <yu...@tencent.com>
---
.../manager/common/pojo/sink/SinkFieldRequest.java | 4 ++
.../common/pojo/sink/SinkFieldResponse.java | 4 ++
.../common/pojo/stream/InlongStreamFieldInfo.java | 4 ++
.../dao/entity/InlongStreamFieldEntity.java | 1 +
.../manager/dao/entity/StreamSinkFieldEntity.java | 1 +
.../mappers/InlongStreamFieldEntityMapper.xml | 15 +++--
.../mappers/StreamSinkFieldEntityMapper.xml | 12 ++--
.../thirdparty/sort/util/FieldInfoUtils.java | 64 +++++++++++++++++++---
.../main/resources/sql/apache_inlong_manager.sql | 2 +
.../manager-web/sql/apache_inlong_manager.sql | 2 +
10 files changed, 90 insertions(+), 19 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldRequest.java
index d5d920a..d4a7073 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldRequest.java
@@ -49,6 +49,10 @@ public class SinkFieldRequest {
@ApiModelProperty("Is this field a meta field, 0: no, 1: yes")
private Integer isMetaField = 0;
+ @ApiModelProperty("Field format,including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601"
+ + " and custom such as 'yyyy-MM-dd HH:mm:ss' etc,maybe this is mainly used for time format")
+ private String fieldFormat;
+
@ApiModelProperty("Field order")
private Short rankNum;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldResponse.java
index 8933a25..cfabcfc 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldResponse.java
@@ -54,6 +54,10 @@ public class SinkFieldResponse {
@ApiModelProperty("Is this field a meta field, 0: no, 1: yes")
private Integer isMetaField = 0;
+ @ApiModelProperty("Field format,including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601"
+ + " and custom such as 'yyyy-MM-dd HH:mm:ss' etc,maybe this is mainly used for time format")
+ private String fieldFormat;
+
@ApiModelProperty("Field order")
private Short rankNum;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamFieldInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamFieldInfo.java
index d10873f..d220267 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamFieldInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamFieldInfo.java
@@ -57,6 +57,10 @@ public class InlongStreamFieldInfo {
@ApiModelProperty("Is this field a meta field, 0: no, 1: yes")
private Integer isMetaField = 0;
+ @ApiModelProperty("Field format,including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601"
+ + " and custom such as 'yyyy-MM-dd HH:mm:ss' etc,maybe this is mainly used for time format")
+ private String fieldFormat;
+
@ApiModelProperty(value = "field rank num")
private Short rankNum;
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongStreamFieldEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongStreamFieldEntity.java
index 0109030..e8fd028 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongStreamFieldEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongStreamFieldEntity.java
@@ -35,6 +35,7 @@ public class InlongStreamFieldEntity implements Serializable {
private String fieldType;
private String fieldComment;
private Integer isMetaField;
+ private String fieldFormat;
private Short rankNum;
private Integer isDeleted;
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java
index a32a123..b2ac034 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java
@@ -38,6 +38,7 @@ public class StreamSinkFieldEntity implements Serializable {
private String sourceFieldName;
private String sourceFieldType;
private Integer isMetaField;
+ private String fieldFormat;
private Short rankNum;
private Integer isDeleted;
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamFieldEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamFieldEntityMapper.xml
index 93ff35c..ab354cb 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamFieldEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamFieldEntityMapper.xml
@@ -31,37 +31,40 @@
<result column="field_type" jdbcType="VARCHAR" property="fieldType"/>
<result column="field_comment" jdbcType="VARCHAR" property="fieldComment"/>
<result column="is_meta_field" jdbcType="SMALLINT" property="isMetaField"/>
+ <result column="field_format" jdbcType="VARCHAR" property="fieldFormat"/>
<result column="rank_num" jdbcType="INTEGER" property="rankNum"/>
<result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
</resultMap>
<sql id="Base_Column_List">
id, inlong_group_id, inlong_stream_id, is_predefined_field, field_name, field_value,
- pre_expression, field_type, field_comment, is_meta_field, rank_num, is_deleted
+ pre_expression, field_type, field_comment, is_meta_field, field_format, rank_num, is_deleted
</sql>
<insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.InlongStreamFieldEntity">
insert into inlong_stream_field (id, inlong_group_id, inlong_stream_id,
is_predefined_field, field_name, field_value,
pre_expression, field_type, field_comment,
- is_meta_field, rank_num, is_deleted)
+ is_meta_field, field_format, rank_num, is_deleted)
values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
#{isPredefinedField,jdbcType=INTEGER}, #{fieldName,jdbcType=VARCHAR}, #{fieldValue,jdbcType=VARCHAR},
#{preExpression,jdbcType=VARCHAR}, #{fieldType,jdbcType=VARCHAR}, #{fieldComment,jdbcType=VARCHAR},
- #{isMetaField,jdbcType=SMALLINT}, #{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER})
+ #{isMetaField,jdbcType=SMALLINT}, #{fieldFormat,jdbcType=VARCHAR},
+ #{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER})
</insert>
<!-- Bulk insert, update if it exists -->
<insert id="insertAll" parameterType="java.util.List">
insert into inlong_stream_field (
id, inlong_group_id, inlong_stream_id, is_predefined_field,
field_name, field_value, pre_expression, field_type,
- field_comment, is_meta_field, rank_num, is_deleted
+ field_comment, is_meta_field, field_format, rank_num, is_deleted
)
values
<foreach collection="fieldList" index="index" item="item" separator=",">
(
#{item.id}, #{item.inlongGroupId}, #{item.inlongStreamId}, #{item.isPredefinedField},
#{item.fieldName}, #{item.fieldValue}, #{item.preExpression}, #{item.fieldType},
- #{item.fieldComment}, #{item.isMetaField}, #{item.rankNum}, #{item.isDeleted}
+ #{item.fieldComment}, #{item.isMetaField}, #{item.fieldFormat},
+ #{item.rankNum}, #{item.isDeleted}
)
</foreach>
ON DUPLICATE KEY UPDATE
@@ -74,6 +77,7 @@
pre_expression = values(pre_expression),
field_type = values(field_type),
is_meta_field = values(is_meta_field),
+ field_format = values(field_format),
field_comment = values(field_comment),
rank_num = values(rank_num),
is_deleted = values(is_deleted)
@@ -116,6 +120,7 @@
pre_expression = #{preExpression,jdbcType=VARCHAR},
field_type = #{fieldType,jdbcType=VARCHAR},
is_meta_field = #(isMetaField,jdbcType=SMALLINT),
+ field_format = #{fieldFormat,jdbcType=VARCHAR},
field_comment = #{fieldComment,jdbcType=VARCHAR},
rank_num = #{rankNum,jdbcType=SMALLINT},
is_deleted = #{isDeleted,jdbcType=INTEGER}
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
index c9e495f..132c356 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
@@ -32,12 +32,13 @@
<result column="source_field_name" jdbcType="VARCHAR" property="sourceFieldName"/>
<result column="source_field_type" jdbcType="VARCHAR" property="sourceFieldType"/>
<result column="is_meta_field" jdbcType="SMALLINT" property="isMetaField"/>
+ <result column="field_format" jdbcType="VARCHAR" property="fieldFormat"/>
<result column="rank_num" jdbcType="SMALLINT" property="rankNum"/>
<result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
</resultMap>
<sql id="Base_Column_List">
id, sink_id, field_name, field_type, field_comment,
- source_field_name, source_field_type, is_meta_field, rank_num, is_deleted
+ source_field_name, source_field_type, is_meta_field, field_format, rank_num, is_deleted
</sql>
<insert id="insert" useGeneratedKeys="true" keyProperty="id"
@@ -45,13 +46,13 @@
insert into stream_sink_field (id, inlong_group_id, inlong_stream_id,
sink_id, sink_type, field_name,
field_type, field_comment, source_field_name,
- source_field_type, is_meta_field,
+ source_field_type, is_meta_field, field_format,
rank_num, is_deleted)
values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
#{sinkId,jdbcType=INTEGER}, #{sinkType,jdbcType=VARCHAR}, #{fieldName,jdbcType=VARCHAR},
#{fieldType,jdbcType=VARCHAR}, #{fieldComment,jdbcType=VARCHAR}, #{sourceFieldName,jdbcType=VARCHAR},
#{sourceFieldType,jdbcType=VARCHAR}, #{isMetaField,jdbcType=SMALLINT},
- #{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER})
+ #{fieldFormat,jdbcType=VARCHAR}, #{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER})
</insert>
<insert id="insertAll">
insert into stream_sink_field (
@@ -60,7 +61,7 @@
sink_type, field_name,
field_type, field_comment,
source_field_name, source_field_type,
- is_meta_field, rank_num, is_deleted
+ is_meta_field, field_format, rank_num, is_deleted
)
values
<foreach collection="list" index="index" item="item" separator=",">
@@ -70,7 +71,8 @@
#{item.sinkType,jdbcType=VARCHAR}, #{item.fieldName,jdbcType=VARCHAR},
#{item.fieldType,jdbcType=VARCHAR}, #{item.fieldComment,jdbcType=VARCHAR},
#{item.sourceFieldName,jdbcType=VARCHAR}, #{item.sourceFieldType,jdbcType=VARCHAR},
- #{item.isMetaField,jdbcType=SMALLINT}, #{item.rankNum,jdbcType=SMALLINT},
+ #{item.isMetaField,jdbcType=SMALLINT}, #{item.fieldFormat,jdbcType=VARCHAR},
+ #{item.rankNum,jdbcType=SMALLINT},
#{item.isDeleted,jdbcType=INTEGER}
)
</foreach>
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/FieldInfoUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/FieldInfoUtils.java
index a14276e..fb3512d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/FieldInfoUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/FieldInfoUtils.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.service.thirdparty.sort.util;
+import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.FieldType;
import org.apache.inlong.manager.common.enums.MetaFieldType;
import org.apache.inlong.manager.common.pojo.sink.SinkFieldResponse;
@@ -76,7 +77,7 @@ public class FieldInfoUtils {
// Set source field info list.
for (InlongStreamFieldInfo field : streamFieldList) {
FieldInfo sourceField = getFieldInfo(field.getFieldName(), field.getFieldType(),
- field.getIsMetaField() == 1);
+ field.getIsMetaField() == 1, field.getFieldFormat());
sourceFields.add(sourceField);
}
@@ -84,11 +85,11 @@ public class FieldInfoUtils {
// Get sink field info list, if the field name equals to build-in field, new a build-in field info
for (SinkFieldResponse field : fieldList) {
FieldInfo sinkField = getFieldInfo(field.getFieldName(), field.getFieldType(),
- field.getIsMetaField() == 1);
+ field.getIsMetaField() == 1, field.getFieldFormat());
sinkFields.add(sinkField);
FieldInfo sourceField = getFieldInfo(field.getSourceFieldName(),
- field.getSourceFieldType(), field.getIsMetaField() == 1);
+ field.getSourceFieldType(), field.getIsMetaField() == 1, field.getFieldFormat());
mappingUnitList.add(new FieldMappingUnit(sourceField, sinkField));
}
@@ -100,10 +101,10 @@ public class FieldInfoUtils {
*
* @apiNote If the field name equals to build-in field, new a build-in field info
*/
- private static FieldInfo getFieldInfo(String fieldName, String fieldType, boolean isBuiltin) {
+ private static FieldInfo getFieldInfo(String fieldName, String fieldType, boolean isBuiltin, String format) {
FieldInfo fieldInfo;
BuiltInField builtInField = BUILT_IN_FIELD_MAP.get(fieldName);
- FormatInfo formatInfo = convertFieldFormat(fieldType.toLowerCase());
+ FormatInfo formatInfo = convertFieldFormat(fieldType.toLowerCase(), format);
if (isBuiltin && builtInField != null) {
fieldInfo = new BuiltInFieldInfo(fieldName, formatInfo, builtInField);
} else {
@@ -139,12 +140,22 @@ public class FieldInfoUtils {
}
/**
- * Get the FieldFormat of Sort according to type string
+ * Get the FieldFormat of Sort according to type string and format of field
*
* @param type type string
* @return Sort field format instance
*/
public static FormatInfo convertFieldFormat(String type) {
+ return convertFieldFormat(type, null);
+ }
+
+ /**
+ * Get the FieldFormat of Sort according to type string
+ *
+ * @param type type string
+ * @return Sort field format instance
+ */
+ public static FormatInfo convertFieldFormat(String type, String format) {
FormatInfo formatInfo;
FieldType fieldType = FieldType.forName(type);
switch (fieldType) {
@@ -176,13 +187,25 @@ public class FieldInfoUtils {
formatInfo = new DecimalFormatInfo();
break;
case DATE:
- formatInfo = new DateFormatInfo();
+ if (StringUtils.isNotBlank(format)) {
+ formatInfo = new DateFormatInfo(convertToSortFormat(format));
+ } else {
+ formatInfo = new DateFormatInfo();
+ }
break;
case TIME:
- formatInfo = new TimeFormatInfo();
+ if (StringUtils.isNotBlank(format)) {
+ formatInfo = new TimeFormatInfo(convertToSortFormat(format));
+ } else {
+ formatInfo = new TimeFormatInfo();
+ }
break;
case TIMESTAMP:
- formatInfo = new TimestampFormatInfo();
+ if (StringUtils.isNotBlank(format)) {
+ formatInfo = new TimestampFormatInfo(convertToSortFormat(format));
+ } else {
+ formatInfo = new TimestampFormatInfo();
+ }
break;
case BINARY:
case FIXED:
@@ -195,4 +218,27 @@ public class FieldInfoUtils {
return formatInfo;
}
+ /**
+ * Convert to sort field format
+ *
+ * @param format The format
+ * @return The sort format
+ */
+ private static String convertToSortFormat(String format) {
+ String sortFormat = format;
+ switch (format) {
+ case "MICROSECONDS":
+ sortFormat = "MICROS";
+ break;
+ case "MILLISECONDS":
+ sortFormat = "MILLIS";
+ break;
+ case "SECONDS":
+ sortFormat = "SECONDS";
+ break;
+ default:
+ }
+ return sortFormat;
+ }
+
}
diff --git a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
index 9416ad0..9598450 100644
--- a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
@@ -405,6 +405,7 @@ CREATE TABLE `inlong_stream_field`
`field_type` varchar(20) NOT NULL COMMENT 'field type',
`field_comment` varchar(50) DEFAULT NULL COMMENT 'Field description',
`is_meta_field` smallint(3) DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
+ `field_format` varchar(50) NOT NULL COMMENT 'Field format',
`rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order (front-end display field order)',
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
PRIMARY KEY (`id`),
@@ -649,6 +650,7 @@ CREATE TABLE `stream_sink_field`
`field_type` varchar(50) NOT NULL COMMENT 'Field type',
`field_comment` varchar(2000) DEFAULT NULL COMMENT 'Field description',
`is_meta_field` smallint(3) DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
+ `field_format` varchar(50) NOT NULL COMMENT 'Field format',
`rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order (front-end display field order)',
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
PRIMARY KEY (`id`)
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 7b6b41d..866a6cb 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -427,6 +427,7 @@ CREATE TABLE `inlong_stream_field`
`field_type` varchar(20) NOT NULL COMMENT 'field type',
`field_comment` varchar(50) DEFAULT NULL COMMENT 'Field description',
`is_meta_field` smallint(3) DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
+ `field_format` varchar(50) NOT NULL COMMENT 'Field format',
`rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order (front-end display field order)',
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
PRIMARY KEY (`id`),
@@ -681,6 +682,7 @@ CREATE TABLE `stream_sink_field`
`field_type` varchar(50) NOT NULL COMMENT 'Field type',
`field_comment` varchar(2000) DEFAULT NULL COMMENT 'Field description',
`is_meta_field` smallint(3) DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
+ `field_format` varchar(50) NOT NULL COMMENT 'Field format',
`rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order (front-end display field order)',
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
PRIMARY KEY (`id`)