You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/26 07:56:38 UTC

[incubator-inlong] branch master updated: [INLONG-3367][Manager] Support custom field format (#3375)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new c0046fa  [INLONG-3367][Manager] Support custom field format  (#3375)
c0046fa is described below

commit c0046fa96c125ca3e84387937b3ef7f94a6f5d41
Author: yunqingmoswu <44...@users.noreply.github.com>
AuthorDate: Sat Mar 26 15:56:31 2022 +0800

    [INLONG-3367][Manager] Support custom field format  (#3375)
    
    Co-authored-by: yunqingmo <yu...@tencent.com>
---
 .../manager/common/pojo/sink/SinkFieldRequest.java |  4 ++
 .../common/pojo/sink/SinkFieldResponse.java        |  4 ++
 .../common/pojo/stream/InlongStreamFieldInfo.java  |  4 ++
 .../dao/entity/InlongStreamFieldEntity.java        |  1 +
 .../manager/dao/entity/StreamSinkFieldEntity.java  |  1 +
 .../mappers/InlongStreamFieldEntityMapper.xml      | 15 +++--
 .../mappers/StreamSinkFieldEntityMapper.xml        | 12 ++--
 .../thirdparty/sort/util/FieldInfoUtils.java       | 64 +++++++++++++++++++---
 .../main/resources/sql/apache_inlong_manager.sql   |  2 +
 .../manager-web/sql/apache_inlong_manager.sql      |  2 +
 10 files changed, 90 insertions(+), 19 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldRequest.java
index d5d920a..d4a7073 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldRequest.java
@@ -49,6 +49,10 @@ public class SinkFieldRequest {
     @ApiModelProperty("Is this field a meta field, 0: no, 1: yes")
     private Integer isMetaField = 0;
 
+    @ApiModelProperty("Field format,including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601"
+            + " and custom such as 'yyyy-MM-dd HH:mm:ss' etc,maybe this is mainly used for time format")
+    private String fieldFormat;
+
     @ApiModelProperty("Field order")
     private Short rankNum;
 
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldResponse.java
index 8933a25..cfabcfc 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldResponse.java
@@ -54,6 +54,10 @@ public class SinkFieldResponse {
     @ApiModelProperty("Is this field a meta field, 0: no, 1: yes")
     private Integer isMetaField = 0;
 
+    @ApiModelProperty("Field format,including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601"
+            + " and custom such as 'yyyy-MM-dd HH:mm:ss' etc,maybe this is mainly used for time format")
+    private String fieldFormat;
+
     @ApiModelProperty("Field order")
     private Short rankNum;
 
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamFieldInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamFieldInfo.java
index d10873f..d220267 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamFieldInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamFieldInfo.java
@@ -57,6 +57,10 @@ public class InlongStreamFieldInfo {
     @ApiModelProperty("Is this field a meta field, 0: no, 1: yes")
     private Integer isMetaField = 0;
 
+    @ApiModelProperty("Field format,including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601"
+            + " and custom such as 'yyyy-MM-dd HH:mm:ss' etc,maybe this is mainly used for time format")
+    private String fieldFormat;
+
     @ApiModelProperty(value = "field rank num")
     private Short rankNum;
 
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongStreamFieldEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongStreamFieldEntity.java
index 0109030..e8fd028 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongStreamFieldEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongStreamFieldEntity.java
@@ -35,6 +35,7 @@ public class InlongStreamFieldEntity implements Serializable {
     private String fieldType;
     private String fieldComment;
     private Integer isMetaField;
+    private String fieldFormat;
     private Short rankNum;
     private Integer isDeleted;
 
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java
index a32a123..b2ac034 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java
@@ -38,6 +38,7 @@ public class StreamSinkFieldEntity implements Serializable {
     private String sourceFieldName;
     private String sourceFieldType;
     private Integer isMetaField;
+    private String fieldFormat;
     private Short rankNum;
     private Integer isDeleted;
 
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamFieldEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamFieldEntityMapper.xml
index 93ff35c..ab354cb 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamFieldEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamFieldEntityMapper.xml
@@ -31,37 +31,40 @@
         <result column="field_type" jdbcType="VARCHAR" property="fieldType"/>
         <result column="field_comment" jdbcType="VARCHAR" property="fieldComment"/>
         <result column="is_meta_field" jdbcType="SMALLINT" property="isMetaField"/>
+        <result column="field_format" jdbcType="VARCHAR" property="fieldFormat"/>
         <result column="rank_num" jdbcType="INTEGER" property="rankNum"/>
         <result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
     </resultMap>
     <sql id="Base_Column_List">
         id, inlong_group_id, inlong_stream_id, is_predefined_field, field_name, field_value,
-        pre_expression, field_type, field_comment, is_meta_field, rank_num, is_deleted
+        pre_expression, field_type, field_comment, is_meta_field, field_format, rank_num, is_deleted
     </sql>
 
     <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.InlongStreamFieldEntity">
         insert into inlong_stream_field (id, inlong_group_id, inlong_stream_id,
                                          is_predefined_field, field_name, field_value,
                                          pre_expression, field_type, field_comment,
-                                         is_meta_field, rank_num, is_deleted)
+                                         is_meta_field, field_format, rank_num, is_deleted)
         values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
                 #{isPredefinedField,jdbcType=INTEGER}, #{fieldName,jdbcType=VARCHAR}, #{fieldValue,jdbcType=VARCHAR},
                 #{preExpression,jdbcType=VARCHAR}, #{fieldType,jdbcType=VARCHAR}, #{fieldComment,jdbcType=VARCHAR},
-                #{isMetaField,jdbcType=SMALLINT}, #{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER})
+                #{isMetaField,jdbcType=SMALLINT}, #{fieldFormat,jdbcType=VARCHAR},
+                #{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER})
     </insert>
     <!-- Bulk insert, update if it exists -->
     <insert id="insertAll" parameterType="java.util.List">
         insert into inlong_stream_field (
         id, inlong_group_id, inlong_stream_id, is_predefined_field,
         field_name, field_value, pre_expression, field_type,
-        field_comment, is_meta_field, rank_num, is_deleted
+        field_comment, is_meta_field, field_format, rank_num, is_deleted
         )
         values
         <foreach collection="fieldList" index="index" item="item" separator=",">
             (
             #{item.id}, #{item.inlongGroupId}, #{item.inlongStreamId}, #{item.isPredefinedField},
             #{item.fieldName}, #{item.fieldValue}, #{item.preExpression}, #{item.fieldType},
-            #{item.fieldComment}, #{item.isMetaField}, #{item.rankNum}, #{item.isDeleted}
+            #{item.fieldComment}, #{item.isMetaField}, #{item.fieldFormat},
+            #{item.rankNum}, #{item.isDeleted}
             )
         </foreach>
         ON DUPLICATE KEY UPDATE
@@ -74,6 +77,7 @@
         pre_expression = values(pre_expression),
         field_type = values(field_type),
         is_meta_field = values(is_meta_field),
+        field_format = values(field_format),
         field_comment = values(field_comment),
         rank_num = values(rank_num),
         is_deleted = values(is_deleted)
@@ -116,6 +120,7 @@
             pre_expression      = #{preExpression,jdbcType=VARCHAR},
             field_type          = #{fieldType,jdbcType=VARCHAR},
             is_meta_field       = #(isMetaField,jdbcType=SMALLINT),
+            field_format        = #{fieldFormat,jdbcType=VARCHAR},
             field_comment       = #{fieldComment,jdbcType=VARCHAR},
             rank_num            = #{rankNum,jdbcType=SMALLINT},
             is_deleted          = #{isDeleted,jdbcType=INTEGER}
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
index c9e495f..132c356 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
@@ -32,12 +32,13 @@
         <result column="source_field_name" jdbcType="VARCHAR" property="sourceFieldName"/>
         <result column="source_field_type" jdbcType="VARCHAR" property="sourceFieldType"/>
         <result column="is_meta_field" jdbcType="SMALLINT" property="isMetaField"/>
+        <result column="field_format" jdbcType="VARCHAR" property="fieldFormat"/>
         <result column="rank_num" jdbcType="SMALLINT" property="rankNum"/>
         <result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
     </resultMap>
     <sql id="Base_Column_List">
         id, sink_id, field_name, field_type, field_comment,
-        source_field_name, source_field_type, is_meta_field, rank_num, is_deleted
+        source_field_name, source_field_type, is_meta_field, field_format, rank_num, is_deleted
     </sql>
 
     <insert id="insert" useGeneratedKeys="true" keyProperty="id"
@@ -45,13 +46,13 @@
         insert into stream_sink_field (id, inlong_group_id, inlong_stream_id,
                                        sink_id, sink_type, field_name,
                                        field_type, field_comment, source_field_name,
-                                       source_field_type, is_meta_field,
+                                       source_field_type, is_meta_field, field_format,
                                        rank_num, is_deleted)
         values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
                 #{sinkId,jdbcType=INTEGER}, #{sinkType,jdbcType=VARCHAR}, #{fieldName,jdbcType=VARCHAR},
                 #{fieldType,jdbcType=VARCHAR}, #{fieldComment,jdbcType=VARCHAR}, #{sourceFieldName,jdbcType=VARCHAR},
                 #{sourceFieldType,jdbcType=VARCHAR}, #{isMetaField,jdbcType=SMALLINT},
-                #{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER})
+                #{fieldFormat,jdbcType=VARCHAR}, #{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER})
     </insert>
     <insert id="insertAll">
         insert into stream_sink_field (
@@ -60,7 +61,7 @@
         sink_type, field_name,
         field_type, field_comment,
         source_field_name, source_field_type,
-        is_meta_field, rank_num, is_deleted
+        is_meta_field, field_format, rank_num, is_deleted
         )
         values
         <foreach collection="list" index="index" item="item" separator=",">
@@ -70,7 +71,8 @@
             #{item.sinkType,jdbcType=VARCHAR}, #{item.fieldName,jdbcType=VARCHAR},
             #{item.fieldType,jdbcType=VARCHAR}, #{item.fieldComment,jdbcType=VARCHAR},
             #{item.sourceFieldName,jdbcType=VARCHAR}, #{item.sourceFieldType,jdbcType=VARCHAR},
-            #{item.isMetaField,jdbcType=SMALLINT}, #{item.rankNum,jdbcType=SMALLINT},
+            #{item.isMetaField,jdbcType=SMALLINT}, #{item.fieldFormat,jdbcType=VARCHAR},
+            #{item.rankNum,jdbcType=SMALLINT},
             #{item.isDeleted,jdbcType=INTEGER}
             )
         </foreach>
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/FieldInfoUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/FieldInfoUtils.java
index a14276e..fb3512d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/FieldInfoUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/FieldInfoUtils.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.service.thirdparty.sort.util;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.FieldType;
 import org.apache.inlong.manager.common.enums.MetaFieldType;
 import org.apache.inlong.manager.common.pojo.sink.SinkFieldResponse;
@@ -76,7 +77,7 @@ public class FieldInfoUtils {
         // Set source field info list.
         for (InlongStreamFieldInfo field : streamFieldList) {
             FieldInfo sourceField = getFieldInfo(field.getFieldName(), field.getFieldType(),
-                    field.getIsMetaField() == 1);
+                    field.getIsMetaField() == 1, field.getFieldFormat());
             sourceFields.add(sourceField);
         }
 
@@ -84,11 +85,11 @@ public class FieldInfoUtils {
         // Get sink field info list, if the field name equals to build-in field, new a build-in field info
         for (SinkFieldResponse field : fieldList) {
             FieldInfo sinkField = getFieldInfo(field.getFieldName(), field.getFieldType(),
-                    field.getIsMetaField() == 1);
+                    field.getIsMetaField() == 1, field.getFieldFormat());
             sinkFields.add(sinkField);
 
             FieldInfo sourceField = getFieldInfo(field.getSourceFieldName(),
-                    field.getSourceFieldType(), field.getIsMetaField() == 1);
+                    field.getSourceFieldType(), field.getIsMetaField() == 1, field.getFieldFormat());
             mappingUnitList.add(new FieldMappingUnit(sourceField, sinkField));
         }
 
@@ -100,10 +101,10 @@ public class FieldInfoUtils {
      *
      * @apiNote If the field name equals to build-in field, new a build-in field info
      */
-    private static FieldInfo getFieldInfo(String fieldName, String fieldType, boolean isBuiltin) {
+    private static FieldInfo getFieldInfo(String fieldName, String fieldType, boolean isBuiltin, String format) {
         FieldInfo fieldInfo;
         BuiltInField builtInField = BUILT_IN_FIELD_MAP.get(fieldName);
-        FormatInfo formatInfo = convertFieldFormat(fieldType.toLowerCase());
+        FormatInfo formatInfo = convertFieldFormat(fieldType.toLowerCase(), format);
         if (isBuiltin && builtInField != null) {
             fieldInfo = new BuiltInFieldInfo(fieldName, formatInfo, builtInField);
         } else {
@@ -139,12 +140,22 @@ public class FieldInfoUtils {
     }
 
     /**
-     * Get the FieldFormat of Sort according to type string
+     * Get the FieldFormat of Sort according to type string and format of field
      *
      * @param type type string
      * @return Sort field format instance
      */
     public static FormatInfo convertFieldFormat(String type) {
+        return convertFieldFormat(type, null);
+    }
+
+    /**
+     * Get the FieldFormat of Sort according to type string
+     *
+     * @param type type string
+     * @return Sort field format instance
+     */
+    public static FormatInfo convertFieldFormat(String type, String format) {
         FormatInfo formatInfo;
         FieldType fieldType = FieldType.forName(type);
         switch (fieldType) {
@@ -176,13 +187,25 @@ public class FieldInfoUtils {
                 formatInfo = new DecimalFormatInfo();
                 break;
             case DATE:
-                formatInfo = new DateFormatInfo();
+                if (StringUtils.isNotBlank(format)) {
+                    formatInfo = new DateFormatInfo(convertToSortFormat(format));
+                } else {
+                    formatInfo = new DateFormatInfo();
+                }
                 break;
             case TIME:
-                formatInfo = new TimeFormatInfo();
+                if (StringUtils.isNotBlank(format)) {
+                    formatInfo = new TimeFormatInfo(convertToSortFormat(format));
+                } else {
+                    formatInfo = new TimeFormatInfo();
+                }
                 break;
             case TIMESTAMP:
-                formatInfo = new TimestampFormatInfo();
+                if (StringUtils.isNotBlank(format)) {
+                    formatInfo = new TimestampFormatInfo(convertToSortFormat(format));
+                } else {
+                    formatInfo = new TimestampFormatInfo();
+                }
                 break;
             case BINARY:
             case FIXED:
@@ -195,4 +218,27 @@ public class FieldInfoUtils {
         return formatInfo;
     }
 
+    /**
+     * Convert to sort field format
+     *
+     * @param format The format
+     * @return The sort format
+     */
+    private static String convertToSortFormat(String format) {
+        String sortFormat = format;
+        switch (format) {
+            case "MICROSECONDS":
+                sortFormat = "MICROS";
+                break;
+            case "MILLISECONDS":
+                sortFormat = "MILLIS";
+                break;
+            case "SECONDS":
+                sortFormat = "SECONDS";
+                break;
+            default:
+        }
+        return sortFormat;
+    }
+
 }
diff --git a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
index 9416ad0..9598450 100644
--- a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
@@ -405,6 +405,7 @@ CREATE TABLE `inlong_stream_field`
     `field_type`          varchar(20)  NOT NULL COMMENT 'field type',
     `field_comment`       varchar(50)  DEFAULT NULL COMMENT 'Field description',
     `is_meta_field`       smallint(3)  DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
+    `field_format`        varchar(50)  NOT NULL COMMENT 'Field format',
     `rank_num`            smallint(6)  DEFAULT '0' COMMENT 'Field order (front-end display field order)',
     `is_deleted`          int(11)      DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
     PRIMARY KEY (`id`),
@@ -649,6 +650,7 @@ CREATE TABLE `stream_sink_field`
     `field_type`        varchar(50)  NOT NULL COMMENT 'Field type',
     `field_comment`     varchar(2000) DEFAULT NULL COMMENT 'Field description',
     `is_meta_field`     smallint(3)   DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
+    `field_format`      varchar(50)  NOT NULL COMMENT 'Field format',
     `rank_num`          smallint(6)   DEFAULT '0' COMMENT 'Field order (front-end display field order)',
     `is_deleted`        int(11)       DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
     PRIMARY KEY (`id`)
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 7b6b41d..866a6cb 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -427,6 +427,7 @@ CREATE TABLE `inlong_stream_field`
     `field_type`          varchar(20)  NOT NULL COMMENT 'field type',
     `field_comment`       varchar(50)  DEFAULT NULL COMMENT 'Field description',
     `is_meta_field`       smallint(3)  DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
+    `field_format`        varchar(50)  NOT NULL COMMENT 'Field format',
     `rank_num`            smallint(6)  DEFAULT '0' COMMENT 'Field order (front-end display field order)',
     `is_deleted`          int(11)      DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
     PRIMARY KEY (`id`),
@@ -681,6 +682,7 @@ CREATE TABLE `stream_sink_field`
     `field_type`        varchar(50)  NOT NULL COMMENT 'Field type',
     `field_comment`     varchar(2000) DEFAULT NULL COMMENT 'Field description',
     `is_meta_field`     smallint(3)   DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
+    `field_format`      varchar(50)  NOT NULL COMMENT 'Field format',
     `rank_num`          smallint(6)   DEFAULT '0' COMMENT 'Field order (front-end display field order)',
     `is_deleted`        int(11)       DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
     PRIMARY KEY (`id`)