You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/06/01 06:02:59 UTC
[incubator-inlong] branch master updated: [INLONG-4458][Sort][Manager] Unify the meta field naming (#4460)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 87709bb70 [INLONG-4458][Sort][Manager] Unify the meta field naming (#4460)
87709bb70 is described below
commit 87709bb706222620f40c66d7db44fe4ad7c529b1
Author: yunqingmoswu <44...@users.noreply.github.com>
AuthorDate: Wed Jun 1 14:02:55 2022 +0800
[INLONG-4458][Sort][Manager] Unify the meta field naming (#4460)
---
.../manager/client/api/impl/InlongStreamImpl.java | 3 +-
.../inlong/manager/common/enums/MetaFieldType.java | 103 -----------
.../manager/common/pojo/stream/StreamField.java | 20 +-
.../dao/entity/InlongStreamFieldEntity.java | 1 +
.../manager/dao/entity/StreamSinkFieldEntity.java | 1 +
.../dao/entity/StreamSourceFieldEntity.java | 2 +
.../dao/entity/StreamTransformFieldEntity.java | 2 +
.../mappers/InlongStreamFieldEntityMapper.xml | 19 +-
.../mappers/StreamSinkFieldEntityMapper.xml | 17 +-
.../mappers/StreamSourceFieldEntityMapper.xml | 51 ++++--
.../mappers/StreamTransformFieldEntityMapper.xml | 110 ++++++-----
.../manager/service/sort/util/FieldInfoUtils.java | 93 +++-------
.../main/resources/sql/apache_inlong_manager.sql | 4 +
.../manager-web/sql/apache_inlong_manager.sql | 4 +
.../inlong/sort/protocol/BuiltInFieldInfo.java | 173 ------------------
.../org/apache/inlong/sort/protocol/FieldInfo.java | 5 +-
.../apache/inlong/sort/protocol/MetaFieldInfo.java | 11 +-
.../protocol/transformation/FunctionParam.java | 4 +-
.../inlong/sort/protocol/BuiltInFieldInfoTest.java | 32 ----
.../apache/inlong/sort/protocol/FieldInfoTest.java | 2 +-
.../inlong/sort/parser/impl/FlinkSqlParser.java | 202 ---------------------
.../apache/inlong/sort/parser/AllMigrateTest.java | 8 +-
.../sort/parser/DistinctNodeSqlParseTest.java | 11 +-
.../inlong/sort/parser/MetaFieldSyncTest.java | 138 +++++---------
24 files changed, 233 insertions(+), 783 deletions(-)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
index cc3e72cf6..ea1770e0a 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
@@ -85,7 +85,8 @@ public class InlongStreamImpl implements InlongStream {
fieldInfo.getFieldComment(),
fieldInfo.getFieldValue(),
fieldInfo.getIsMetaField(),
- fieldInfo.getFieldFormat()
+ fieldInfo.getMetaFieldName(),
+ fieldInfo.getOriginNodeName()
)
).collect(Collectors.toList());
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MetaFieldType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MetaFieldType.java
deleted file mode 100644
index 3a52e41a0..000000000
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MetaFieldType.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.common.enums;
-
-import lombok.Getter;
-
-@Getter
-public enum MetaFieldType {
-
- /**
- * database
- */
- DATABASE("database", "meta field database used in canal json or mysql binlog and so on"),
-
- /**
- * processing_time
- */
- PROCESSING_TIME("processing_time", "meta field processing_time describe such moment the record be processed"),
-
- /**
- * data_time
- */
- DATA_TIME("data_time", "meta field data_time used in canal json or mysql binlog and so on"),
-
- /**
- * table
- */
- TABLE("table", "meta field table used in canal json or mysql binlog and so on"),
-
- /**
- * event_time
- */
- EVENT_TIME("event_time", "meta field event_time used in canal json or mysql binlog and so on"),
-
- /**
- * is_ddl
- */
- IS_DDL("is_ddl", "meta field is_ddl used in canal json or mysql binlog and so on"),
-
- /**
- * event_type
- */
- EVENT_TYPE("event_type", "meta field event_type used in canal json or mysql binlog and so on"),
-
- /**
- * data
- */
- MYSQL_DATA("data", "MySQL binlog data Row"),
-
- /**
- * update_before
- */
- UPDATE_BEFORE("update_before", "The value of the field before update"),
-
- /**
- * Batch id of binlog
- */
- BATCH_ID("batch_id", "Batch id of binlog"),
-
- /**
- * sql_type
- */
- SQL_TYPE("sql_type", "Mapping of sql_type table fields to java data type IDs"),
-
- /**
- * ts
- */
- TS("ts", "The current time when the ROW was received and processed"),
-
- /**
- * mysql_type
- */
- MYSQL_TYPE("mysql_type", "The table structure"),
-
- /**
- * pk_names
- */
- PK_NAMES("pk_names", "Primary key field name");
-
- private final String name;
-
- private final String description;
-
- MetaFieldType(String name, String description) {
- this.name = name;
- this.description = description;
- }
-}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamField.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamField.java
index 02a5e9358..6d386f71b 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamField.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamField.java
@@ -22,8 +22,6 @@ import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
-import org.apache.inlong.manager.common.enums.FieldType;
-import org.apache.inlong.manager.common.enums.MetaFieldType;
/**
* Stream filed, including field name, field type, etc.
@@ -34,12 +32,6 @@ import org.apache.inlong.manager.common.enums.MetaFieldType;
@ApiModel("Stream field configuration")
public class StreamField {
- public static final StreamField PROCESSING_TIME = new StreamField(
- 100,
- FieldType.BIGINT.toString(),
- MetaFieldType.PROCESSING_TIME.getName(),
- null, null, 1);
-
@ApiModelProperty("Field index")
private Integer id;
@@ -70,6 +62,9 @@ public class StreamField {
@ApiModelProperty("Is this field a meta field, 0: no, 1: yes")
private Integer isMetaField = 0;
+ @ApiModelProperty(value = "Meta field name")
+ private String metaFieldName;
+
@ApiModelProperty("Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601"
+ " and custom such as 'yyyy-MM-dd HH:mm:ss'. This is mainly used for time format")
private String fieldFormat;
@@ -92,22 +87,25 @@ public class StreamField {
}
public StreamField(int index, String fieldType, String fieldName, String fieldComment, String fieldValue,
- Integer isMetaField) {
+ Integer isMetaField, String metaFieldName) {
this(index, fieldType, fieldName, fieldComment, fieldValue);
this.isMetaField = isMetaField;
+ this.metaFieldName = metaFieldName;
}
public StreamField(int index, String fieldType, String fieldName, String fieldComment, String fieldValue,
- Integer isMetaField, String originNodeName) {
+ Integer isMetaField, String metaFieldName, String originNodeName) {
this(index, fieldType, fieldName, fieldComment, fieldValue);
this.isMetaField = isMetaField;
+ this.metaFieldName = metaFieldName;
this.originNodeName = originNodeName;
}
public StreamField(int index, String fieldType, String fieldName, String fieldComment, String fieldValue,
- Integer isMetaField, String originNodeName, String originFieldName) {
+ Integer isMetaField, String metaFieldName, String originNodeName, String originFieldName) {
this(index, fieldType, fieldName, fieldComment, fieldValue);
this.isMetaField = isMetaField;
+ this.metaFieldName = metaFieldName;
this.originNodeName = originNodeName;
this.originFieldName = originFieldName;
}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongStreamFieldEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongStreamFieldEntity.java
index e2a1f8ecb..ffd3fff9b 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongStreamFieldEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongStreamFieldEntity.java
@@ -38,6 +38,7 @@ public class InlongStreamFieldEntity implements Serializable {
private String fieldType;
private String fieldComment;
private Integer isMetaField;
+ private String metaFieldName;
private String fieldFormat;
private Short rankNum;
private Integer isDeleted;
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java
index 1482c4f07..31e9a9275 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java
@@ -43,6 +43,7 @@ public class StreamSinkFieldEntity implements Serializable {
private String extParams;
private Integer isMetaField;
+ private String metaFieldName;
private String fieldFormat;
private Short rankNum;
private Integer isDeleted;
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceFieldEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceFieldEntity.java
index 665812386..5a766936c 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceFieldEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceFieldEntity.java
@@ -48,6 +48,8 @@ public class StreamSourceFieldEntity implements Serializable {
private Integer isMetaField;
+ private String metaFieldName;
+
private String fieldFormat;
private Integer rankNum;
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamTransformFieldEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamTransformFieldEntity.java
index 49ed97fa1..aa9d1d5d6 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamTransformFieldEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamTransformFieldEntity.java
@@ -49,6 +49,8 @@ public class StreamTransformFieldEntity implements Serializable {
private Integer isMetaField;
+ private String metaFieldName;
+
private String fieldFormat;
private Integer rankNum;
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamFieldEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamFieldEntityMapper.xml
index a335d932b..89abb39dc 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamFieldEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamFieldEntityMapper.xml
@@ -31,25 +31,26 @@
<result column="field_type" jdbcType="VARCHAR" property="fieldType"/>
<result column="field_comment" jdbcType="VARCHAR" property="fieldComment"/>
<result column="is_meta_field" jdbcType="SMALLINT" property="isMetaField"/>
+ <result column="meta_field_name" jdbcType="VARCHAR" property="metaFieldName"/>
<result column="field_format" jdbcType="VARCHAR" property="fieldFormat"/>
<result column="rank_num" jdbcType="INTEGER" property="rankNum"/>
<result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
</resultMap>
<sql id="Base_Column_List">
- id
- , inlong_group_id, inlong_stream_id, is_predefined_field, field_name, field_value,
- pre_expression, field_type, field_comment, is_meta_field, field_format, rank_num, is_deleted
+ id, inlong_group_id, inlong_stream_id, is_predefined_field, field_name, field_value, pre_expression,
+ field_type, field_comment, is_meta_field, meta_field_name, field_format, rank_num, is_deleted
</sql>
<insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.InlongStreamFieldEntity">
insert into inlong_stream_field (id, inlong_group_id, inlong_stream_id,
is_predefined_field, field_name, field_value,
pre_expression, field_type, field_comment,
- is_meta_field, field_format, rank_num, is_deleted)
+ is_meta_field, meta_field_name, field_format,
+ rank_num, is_deleted)
values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
#{isPredefinedField,jdbcType=INTEGER}, #{fieldName,jdbcType=VARCHAR}, #{fieldValue,jdbcType=VARCHAR},
#{preExpression,jdbcType=VARCHAR}, #{fieldType,jdbcType=VARCHAR}, #{fieldComment,jdbcType=VARCHAR},
- #{isMetaField,jdbcType=SMALLINT}, #{fieldFormat,jdbcType=VARCHAR},
+ #{isMetaField,jdbcType=SMALLINT}, #{metaFieldName,jdbcType=VARCHAR}, #{fieldFormat,jdbcType=VARCHAR},
#{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER})
</insert>
<!-- Bulk insert, update if it exists -->
@@ -57,14 +58,15 @@
insert into inlong_stream_field (
id, inlong_group_id, inlong_stream_id, is_predefined_field,
field_name, field_value, pre_expression, field_type,
- field_comment, is_meta_field, field_format, rank_num, is_deleted
+ field_comment, is_meta_field, meta_field_name, field_format,
+ rank_num, is_deleted
)
values
<foreach collection="fieldList" index="index" item="item" separator=",">
(
#{item.id}, #{item.inlongGroupId}, #{item.inlongStreamId}, #{item.isPredefinedField},
#{item.fieldName}, #{item.fieldValue}, #{item.preExpression}, #{item.fieldType},
- #{item.fieldComment}, #{item.isMetaField}, #{item.fieldFormat},
+ #{item.fieldComment}, #{item.isMetaField}, #{item.metaFieldName}, #{item.fieldFormat},
#{item.rankNum}, #{item.isDeleted}
)
</foreach>
@@ -78,6 +80,7 @@
pre_expression = values(pre_expression),
field_type = values(field_type),
is_meta_field = values(is_meta_field),
+ meta_field_name = values(meta_field_name),
field_format = values(field_format),
field_comment = values(field_comment),
rank_num = values(rank_num),
@@ -123,4 +126,4 @@
and is_deleted = 0
</delete>
-</mapper>
\ No newline at end of file
+</mapper>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
index 73ae39791..5587a8214 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
@@ -33,13 +33,14 @@
<result column="source_field_type" jdbcType="VARCHAR" property="sourceFieldType"/>
<result column="ext_params" jdbcType="LONGVARCHAR" property="extParams"/>
<result column="is_meta_field" jdbcType="SMALLINT" property="isMetaField"/>
+ <result column="meta_field_name" jdbcType="VARCHAR" property="metaFieldName"/>
<result column="field_format" jdbcType="VARCHAR" property="fieldFormat"/>
<result column="rank_num" jdbcType="SMALLINT" property="rankNum"/>
<result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
</resultMap>
<sql id="Base_Column_List">
id, sink_id, field_name, field_type, field_comment, source_field_name, source_field_type,
- ext_params, is_meta_field, field_format, rank_num, is_deleted
+ ext_params, is_meta_field, meta_field_name, field_format, rank_num, is_deleted
</sql>
<insert id="insert" useGeneratedKeys="true" keyProperty="id"
@@ -49,7 +50,8 @@
sink_type, field_name,
field_type, field_comment,
source_field_name, source_field_type,
- ext_params, is_meta_field, field_format,
+ ext_params, is_meta_field,
+ meta_field_name, field_format,
rank_num, is_deleted)
values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR},
#{inlongStreamId,jdbcType=VARCHAR}, #{sinkId,jdbcType=INTEGER},
@@ -57,8 +59,8 @@
#{fieldType,jdbcType=VARCHAR}, #{fieldComment,jdbcType=VARCHAR},
#{sourceFieldName,jdbcType=VARCHAR}, #{sourceFieldType,jdbcType=VARCHAR},
#{extParams,jdbcType=LONGVARCHAR}, #{isMetaField,jdbcType=SMALLINT},
- #{fieldFormat,jdbcType=VARCHAR}, #{rankNum,jdbcType=SMALLINT},
- #{isDeleted,jdbcType=INTEGER})
+ #{metaFieldName,jdbcType=VARCHAR}, #{fieldFormat,jdbcType=VARCHAR},
+ #{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER})
</insert>
<insert id="insertAll">
insert into stream_sink_field (
@@ -67,7 +69,8 @@
sink_type, field_name,
field_type, field_comment,
source_field_name, source_field_type,
- ext_params, is_meta_field, field_format,
+ ext_params, is_meta_field,
+ meta_field_name, field_format,
rank_num, is_deleted
)
values
@@ -78,8 +81,8 @@
#{item.sinkType,jdbcType=VARCHAR}, #{item.fieldName,jdbcType=VARCHAR},
#{item.fieldType,jdbcType=VARCHAR}, #{item.fieldComment,jdbcType=VARCHAR},
#{item.sourceFieldName,jdbcType=VARCHAR}, #{item.sourceFieldType,jdbcType=VARCHAR},
- #{item.extParams,jdbcType=LONGVARCHAR},
- #{item.isMetaField,jdbcType=SMALLINT}, #{item.fieldFormat,jdbcType=VARCHAR},
+ #{item.extParams,jdbcType=LONGVARCHAR}, #{item.isMetaField,jdbcType=SMALLINT},
+ #{item.metaFieldName,jdbcType=VARCHAR}, #{item.fieldFormat,jdbcType=VARCHAR},
#{item.rankNum,jdbcType=SMALLINT}, #{item.isDeleted,jdbcType=INTEGER}
)
</foreach>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml
index 97e60414c..0bf9acd9b 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml
@@ -32,15 +32,14 @@
<result column="field_type" jdbcType="VARCHAR" property="fieldType"/>
<result column="field_comment" jdbcType="VARCHAR" property="fieldComment"/>
<result column="is_meta_field" jdbcType="SMALLINT" property="isMetaField"/>
+ <result column="meta_field_name" jdbcType="VARCHAR" property="metaFieldName"/>
<result column="field_format" jdbcType="VARCHAR" property="fieldFormat"/>
<result column="rank_num" jdbcType="SMALLINT" property="rankNum"/>
<result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
</resultMap>
<sql id="Base_Column_List">
- id
- , inlong_group_id, inlong_stream_id, source_id, source_type, field_name, field_value,
- pre_expression, field_type, field_comment, is_meta_field, field_format, rank_num,
- is_deleted
+ id, inlong_group_id, inlong_stream_id, source_id, source_type, field_name, field_value, pre_expression,
+ field_type, field_comment, is_meta_field, meta_field_name, field_format, rank_num, is_deleted
</sql>
<select id="selectBySourceId" resultType="org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity">
select
@@ -59,13 +58,13 @@
insert into stream_source_field (id, inlong_group_id, inlong_stream_id,
source_id, source_type, field_name,
field_value, pre_expression, field_type,
- field_comment, is_meta_field, field_format,
- rank_num, is_deleted)
+ field_comment, is_meta_field, meta_field_name,
+ field_format, rank_num, is_deleted)
values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
#{sourceId,jdbcType=INTEGER}, #{sourceType,jdbcType=VARCHAR}, #{fieldName,jdbcType=VARCHAR},
#{fieldValue,jdbcType=VARCHAR}, #{preExpression,jdbcType=VARCHAR}, #{fieldType,jdbcType=VARCHAR},
- #{fieldComment,jdbcType=VARCHAR}, #{isMetaField,jdbcType=SMALLINT}, #{fieldFormat,jdbcType=VARCHAR},
- #{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER})
+ #{fieldComment,jdbcType=VARCHAR}, #{isMetaField,jdbcType=SMALLINT}, #{metaFieldName,jdbcType=VARCHAR},
+ #{fieldFormat,jdbcType=VARCHAR}, #{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER})
</insert>
<insert id="insertSelective" useGeneratedKeys="true" keyProperty="id"
parameterType="org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity">
@@ -104,6 +103,9 @@
<if test="isMetaField != null">
is_meta_field,
</if>
+ <if test="metaFieldName != null">
+ meta_field_name,
+ </if>
<if test="fieldFormat != null">
field_format,
</if>
@@ -148,6 +150,9 @@
<if test="isMetaField != null">
#{isMetaField,jdbcType=SMALLINT},
</if>
+ <if test="metaFieldName != null">
+ #{metaFieldName,jdbcType=VARCHAR},
+ </if>
<if test="fieldFormat != null">
#{fieldFormat,jdbcType=VARCHAR},
</if>
@@ -193,6 +198,9 @@
<if test="isMetaField != null">
is_meta_field = #{isMetaField,jdbcType=SMALLINT},
</if>
+ <if test="metaFieldName != null">
+ meta_field_name = #{metaFieldName,jdbcType=VARCHAR},
+ </if>
<if test="fieldFormat != null">
field_format = #{fieldFormat,jdbcType=VARCHAR},
</if>
@@ -217,6 +225,7 @@
field_type = #{fieldType,jdbcType=VARCHAR},
field_comment = #{fieldComment,jdbcType=VARCHAR},
is_meta_field = #{isMetaField,jdbcType=SMALLINT},
+ meta_field_name = #{metaFieldName,jdbcType=VARCHAR},
field_format = #{fieldFormat,jdbcType=VARCHAR},
rank_num = #{rankNum,jdbcType=SMALLINT},
is_deleted = #{isDeleted,jdbcType=INTEGER}
@@ -224,21 +233,23 @@
</update>
<insert id="insertAll" parameterType="org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity">
- insert into stream_source_field (id, inlong_group_id, inlong_stream_id,
- source_id, source_type, field_name,
- field_value, pre_expression, field_type,
- field_comment, is_meta_field, field_format,
- rank_num, is_deleted)
+ insert into stream_source_field (id, inlong_group_id,
+ inlong_stream_id, source_id,
+ source_type, field_name,
+ field_value, pre_expression,
+ field_type, field_comment,
+ is_meta_field, meta_field_name,
+ field_format, rank_num, is_deleted)
values
<foreach collection="list" index="index" item="item" separator=",">
(#{item.id,jdbcType=INTEGER}, #{item.inlongGroupId,jdbcType=VARCHAR},
- #{item.inlongStreamId,jdbcType=VARCHAR},
- #{item.sourceId,jdbcType=INTEGER}, #{item.sourceType,jdbcType=VARCHAR}, #{item.fieldName,jdbcType=VARCHAR},
+ #{item.inlongStreamId,jdbcType=VARCHAR}, #{item.sourceId,jdbcType=INTEGER},
+ #{item.sourceType,jdbcType=VARCHAR}, #{item.fieldName,jdbcType=VARCHAR},
#{item.fieldValue,jdbcType=VARCHAR}, #{item.preExpression,jdbcType=VARCHAR},
- #{item.fieldType,jdbcType=VARCHAR},
- #{item.fieldComment,jdbcType=VARCHAR}, #{item.isMetaField,jdbcType=SMALLINT},
- #{item.fieldFormat,jdbcType=VARCHAR},
- #{item.rankNum,jdbcType=SMALLINT}, #{item.isDeleted,jdbcType=INTEGER})
+ #{item.fieldType,jdbcType=VARCHAR}, #{item.fieldComment,jdbcType=VARCHAR},
+ #{item.isMetaField,jdbcType=SMALLINT}, #{item.metaFieldName,jdbcType=VARCHAR},
+ #{item.fieldFormat,jdbcType=VARCHAR}, #{item.rankNum,jdbcType=SMALLINT},
+ #{item.isDeleted,jdbcType=INTEGER})
</foreach>
</insert>
@@ -247,4 +258,4 @@
from stream_source_field
where source_id = #{sourceId,jdbcType=INTEGER}
</delete>
-</mapper>
\ No newline at end of file
+</mapper>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformFieldEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformFieldEntityMapper.xml
index 015777dce..f35c28675 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformFieldEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformFieldEntityMapper.xml
@@ -32,17 +32,17 @@
<result column="field_type" jdbcType="VARCHAR" property="fieldType"/>
<result column="field_comment" jdbcType="VARCHAR" property="fieldComment"/>
<result column="is_meta_field" jdbcType="SMALLINT" property="isMetaField"/>
+ <result column="meta_field_name" jdbcType="VARCHAR" property="metaFieldName"/>
<result column="field_format" jdbcType="VARCHAR" property="fieldFormat"/>
- <result column="rank_num" jdbcType="SMALLINT" property="rankNum"/>
- <result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
<result column="origin_node_name" jdbcType="VARCHAR" property="originNodeName"/>
<result column="origin_field_name" jdbcType="VARCHAR" property="originFieldName"/>
+ <result column="rank_num" jdbcType="SMALLINT" property="rankNum"/>
+ <result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
</resultMap>
<sql id="Base_Column_List">
- id
- , inlong_group_id, inlong_stream_id, transform_id, transform_type, field_name,
- field_value, pre_expression, field_type, field_comment, is_meta_field, field_format,
- rank_num, is_deleted, origin_node_name, origin_field_name
+ id, inlong_group_id, inlong_stream_id, transform_id, transform_type, field_name,
+ field_value, pre_expression, field_type, field_comment, is_meta_field, meta_field_name,
+ field_format, origin_node_name, origin_field_name, rank_num, is_deleted
</sql>
<select id="selectByTransformId" resultType="org.apache.inlong.manager.dao.entity.StreamTransformFieldEntity">
select
@@ -77,15 +77,17 @@
insert into stream_transform_field (id, inlong_group_id, inlong_stream_id,
transform_id, transform_type, field_name,
field_value, pre_expression, field_type,
- field_comment, is_meta_field, field_format,
- rank_num, is_deleted, origin_node_name,
- origin_field_name)
+ field_comment, is_meta_field,
+ meta_field_name, field_format,
+ origin_node_name, origin_field_name,
+ rank_num, is_deleted)
values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
#{transformId,jdbcType=INTEGER}, #{transformType,jdbcType=VARCHAR}, #{fieldName,jdbcType=VARCHAR},
#{fieldValue,jdbcType=VARCHAR}, #{preExpression,jdbcType=VARCHAR}, #{fieldType,jdbcType=VARCHAR},
- #{fieldComment,jdbcType=VARCHAR}, #{isMetaField,jdbcType=SMALLINT}, #{fieldFormat,jdbcType=VARCHAR},
- #{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER}, #{originNodeName,jdbcType=VARCHAR},
- #{originFieldName,jdbcType=VARCHAR})
+ #{fieldComment,jdbcType=VARCHAR}, #{isMetaField,jdbcType=SMALLINT},
+ #{metaFieldName,jdbcType=VARCHAR}, #{fieldFormat,jdbcType=VARCHAR},
+ #{originNodeName,jdbcType=VARCHAR}, #{originFieldName,jdbcType=VARCHAR},
+ #{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER})
</insert>
<insert id="insertSelective" useGeneratedKeys="true" keyProperty="id"
parameterType="org.apache.inlong.manager.dao.entity.StreamTransformFieldEntity">
@@ -124,21 +126,24 @@
<if test="isMetaField != null">
is_meta_field,
</if>
+ <if test="metaFieldName != null">
+ meta_field_name,
+ </if>
<if test="fieldFormat != null">
field_format,
</if>
- <if test="rankNum != null">
- rank_num,
- </if>
- <if test="isDeleted != null">
- is_deleted,
- </if>
<if test="originNodeName != null">
origin_node_name,
</if>
<if test="originFieldName != null">
origin_field_name,
</if>
+ <if test="rankNum != null">
+ rank_num,
+ </if>
+ <if test="isDeleted != null">
+ is_deleted,
+ </if>
</trim>
<trim prefix="values (" suffix=")" suffixOverrides=",">
<if test="id != null">
@@ -174,21 +179,24 @@
<if test="isMetaField != null">
#{isMetaField,jdbcType=SMALLINT},
</if>
+ <if test="metaFieldName != null">
+ #{metaFieldName,jdbcType=VARCHAR},
+ </if>
<if test="fieldFormat != null">
#{fieldFormat,jdbcType=VARCHAR},
</if>
- <if test="rankNum != null">
- #{rankNum,jdbcType=SMALLINT},
- </if>
- <if test="isDeleted != null">
- #{isDeleted,jdbcType=INTEGER},
- </if>
<if test="originNodeName != null">
#{originNodeName,jdbcType=VARCHAR},
</if>
<if test="originFieldName != null">
#{originFieldName,jdbcType=VARCHAR},
</if>
+ <if test="rankNum != null">
+ #{rankNum,jdbcType=SMALLINT},
+ </if>
+ <if test="isDeleted != null">
+ #{isDeleted,jdbcType=INTEGER},
+ </if>
</trim>
</insert>
<update id="updateByPrimaryKeySelective"
@@ -225,21 +233,24 @@
<if test="isMetaField != null">
is_meta_field = #{isMetaField,jdbcType=SMALLINT},
</if>
+ <if test="metaFieldName != null">
+ meta_field_name = #{metaFieldName,jdbcType=VARCHAR},
+ </if>
<if test="fieldFormat != null">
field_format = #{fieldFormat,jdbcType=VARCHAR},
</if>
- <if test="rankNum != null">
- rank_num = #{rankNum,jdbcType=SMALLINT},
- </if>
- <if test="isDeleted != null">
- is_deleted = #{isDeleted,jdbcType=INTEGER},
- </if>
<if test="originNodeName != null">
origin_node_name = #{originNodeName,jdbcType=VARCHAR},
</if>
<if test="originFieldName != null">
origin_field_name = #{originFieldName,jdbcType=VARCHAR},
</if>
+ <if test="rankNum != null">
+ rank_num = #{rankNum,jdbcType=SMALLINT},
+ </if>
+ <if test="isDeleted != null">
+ is_deleted = #{isDeleted,jdbcType=INTEGER},
+ </if>
</set>
where id = #{id,jdbcType=INTEGER}
</update>
@@ -255,33 +266,36 @@
field_type = #{fieldType,jdbcType=VARCHAR},
field_comment = #{fieldComment,jdbcType=VARCHAR},
is_meta_field = #{isMetaField,jdbcType=SMALLINT},
+ meta_field_name = #{metaFieldName,jdbcType=VARCHAR},
field_format = #{fieldFormat,jdbcType=VARCHAR},
- rank_num = #{rankNum,jdbcType=SMALLINT},
- is_deleted = #{isDeleted,jdbcType=INTEGER},
origin_node_name = #{originNodeName,jdbcType=VARCHAR},
- origin_field_name = #{originFieldName,jdbcType=VARCHAR}
+ origin_field_name = #{originFieldName,jdbcType=VARCHAR},
+ rank_num = #{rankNum,jdbcType=SMALLINT},
+ is_deleted = #{isDeleted,jdbcType=INTEGER}
where id = #{id,jdbcType=INTEGER}
</update>
<insert id="insertAll" parameterType="org.apache.inlong.manager.dao.entity.StreamTransformFieldEntity">
- insert into stream_transform_field (id, inlong_group_id, inlong_stream_id,
- transform_id, transform_type, field_name,
- field_value, pre_expression, field_type,
- field_comment, is_meta_field, field_format,
- rank_num, is_deleted, origin_node_name,
- origin_field_name)
+ insert into stream_transform_field (id, inlong_group_id,
+ inlong_stream_id, transform_id,
+ transform_type, field_name,
+ field_value, pre_expression,
+ field_type, field_comment,
+ is_meta_field, meta_field_name,
+ field_format, origin_node_name,
+ origin_field_name,
+ rank_num, is_deleted)
values
<foreach collection="list" index="index" item="item" separator=",">
(#{item.id,jdbcType=INTEGER}, #{item.inlongGroupId,jdbcType=VARCHAR},
- #{item.inlongStreamId,jdbcType=VARCHAR},
- #{item.transformId,jdbcType=INTEGER}, #{item.transformType,jdbcType=VARCHAR},
- #{item.fieldName,jdbcType=VARCHAR},
+ #{item.inlongStreamId,jdbcType=VARCHAR}, #{item.transformId,jdbcType=INTEGER},
+ #{item.transformType,jdbcType=VARCHAR}, #{item.fieldName,jdbcType=VARCHAR},
#{item.fieldValue,jdbcType=VARCHAR}, #{item.preExpression,jdbcType=VARCHAR},
- #{item.fieldType,jdbcType=VARCHAR},
- #{item.fieldComment,jdbcType=VARCHAR}, #{item.isMetaField,jdbcType=SMALLINT},
- #{item.fieldFormat,jdbcType=VARCHAR},
- #{item.rankNum,jdbcType=SMALLINT}, #{item.isDeleted,jdbcType=INTEGER},
- #{item.originNodeName,jdbcType=VARCHAR}, #{item.originFieldName,jdbcType=VARCHAR})
+ #{item.fieldType,jdbcType=VARCHAR}, #{item.fieldComment,jdbcType=VARCHAR},
+ #{item.isMetaField,jdbcType=SMALLINT}, #{item.metaFieldName,jdbcType=VARCHAR},
+ #{item.fieldFormat,jdbcType=VARCHAR}, #{item.originNodeName,jdbcType=VARCHAR},
+ #{item.originFieldName,jdbcType=VARCHAR},
+ #{item.rankNum,jdbcType=SMALLINT}, #{item.isDeleted,jdbcType=INTEGER})
</foreach>
</insert>
@@ -290,4 +304,4 @@
from stream_transform_field
where transform_id = #{transformId,jdbcType=INTEGER}
</delete>
-</mapper>
\ No newline at end of file
+</mapper>
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldInfoUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldInfoUtils.java
index 3f28cadb8..e9859592a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldInfoUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldInfoUtils.java
@@ -20,7 +20,6 @@ package org.apache.inlong.manager.service.sort.util;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.FieldType;
-import org.apache.inlong.manager.common.enums.MetaFieldType;
import org.apache.inlong.manager.common.pojo.sink.SinkField;
import org.apache.inlong.manager.common.pojo.stream.StreamField;
import org.apache.inlong.sort.formats.common.ArrayFormatInfo;
@@ -38,15 +37,13 @@ import org.apache.inlong.sort.formats.common.ShortFormatInfo;
import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.apache.inlong.sort.formats.common.TimeFormatInfo;
import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField;
import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.MetaFieldInfo;
+import org.apache.inlong.sort.protocol.MetaFieldInfo.MetaField;
import org.apache.inlong.sort.protocol.transformation.FieldMappingRule.FieldMappingUnit;
import java.util.ArrayList;
-import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Map;
/**
* Util for sort field info.
@@ -54,49 +51,27 @@ import java.util.Map;
@Slf4j
public class FieldInfoUtils {
- /**
- * Built in field map, key is field name, value is built in field name
- */
- public static final Map<String, BuiltInField> BUILT_IN_FIELD_MAP = new LinkedHashMap<>();
-
- static {
- BUILT_IN_FIELD_MAP.put(MetaFieldType.DATA_TIME.getName(), BuiltInField.DATA_TIME);
- BUILT_IN_FIELD_MAP.put(MetaFieldType.DATABASE.getName(), BuiltInField.MYSQL_METADATA_DATABASE);
- BUILT_IN_FIELD_MAP.put(MetaFieldType.TABLE.getName(), BuiltInField.MYSQL_METADATA_TABLE);
- BUILT_IN_FIELD_MAP.put(MetaFieldType.EVENT_TIME.getName(), BuiltInField.MYSQL_METADATA_EVENT_TIME);
- BUILT_IN_FIELD_MAP.put(MetaFieldType.IS_DDL.getName(), BuiltInField.MYSQL_METADATA_IS_DDL);
- BUILT_IN_FIELD_MAP.put(MetaFieldType.EVENT_TYPE.getName(), BuiltInField.MYSQL_METADATA_EVENT_TYPE);
- BUILT_IN_FIELD_MAP.put(MetaFieldType.PROCESSING_TIME.getName(), BuiltInField.PROCESS_TIME);
- BUILT_IN_FIELD_MAP.put(MetaFieldType.UPDATE_BEFORE.getName(), BuiltInField.METADATA_UPDATE_BEFORE);
- BUILT_IN_FIELD_MAP.put(MetaFieldType.BATCH_ID.getName(), BuiltInField.METADATA_BATCH_ID);
- BUILT_IN_FIELD_MAP.put(MetaFieldType.SQL_TYPE.getName(), BuiltInField.METADATA_SQL_TYPE);
- BUILT_IN_FIELD_MAP.put(MetaFieldType.TS.getName(), BuiltInField.METADATA_TS);
- BUILT_IN_FIELD_MAP.put(MetaFieldType.MYSQL_TYPE.getName(), BuiltInField.METADATA_MYSQL_TYPE);
- BUILT_IN_FIELD_MAP.put(MetaFieldType.PK_NAMES.getName(), BuiltInField.METADATA_PK_NAMES);
- BUILT_IN_FIELD_MAP.put(MetaFieldType.MYSQL_DATA.getName(), BuiltInField.MYSQL_METADATA_DATA);
- }
-
public static FieldInfo parseSinkFieldInfo(SinkField sinkField, String nodeId) {
- boolean isBuiltIn = sinkField.getIsMetaField() == 1;
+ boolean isMetaField = sinkField.getIsMetaField() == 1;
FieldInfo fieldInfo = getFieldInfo(sinkField.getFieldName(),
- sinkField.getFieldType(),
- isBuiltIn, sinkField.getFieldFormat());
+ sinkField.getFieldType(), isMetaField, sinkField.getMetaFieldName(),
+ sinkField.getFieldFormat());
fieldInfo.setNodeId(nodeId);
return fieldInfo;
}
public static FieldInfo parseStreamFieldInfo(StreamField streamField, String nodeId) {
- boolean isBuiltIn = streamField.getIsMetaField() == 1;
- FieldInfo fieldInfo = getFieldInfo(streamField.getFieldName(), streamField.getFieldType(), isBuiltIn,
- streamField.getFieldFormat());
+ boolean isMetaField = streamField.getIsMetaField() == 1;
+ FieldInfo fieldInfo = getFieldInfo(streamField.getFieldName(), streamField.getFieldType(),
+ isMetaField, streamField.getMetaFieldName(), streamField.getFieldFormat());
fieldInfo.setNodeId(nodeId);
return fieldInfo;
}
public static FieldInfo parseStreamField(StreamField streamField) {
- boolean isBuiltIn = streamField.getIsMetaField() == 1;
- FieldInfo fieldInfo = getFieldInfo(streamField.getFieldName(), streamField.getFieldType(), isBuiltIn,
- streamField.getFieldFormat());
+ boolean isMetaField = streamField.getIsMetaField() == 1;
+ FieldInfo fieldInfo = getFieldInfo(streamField.getFieldName(), streamField.getFieldType(),
+ isMetaField, streamField.getMetaFieldName(), streamField.getFieldFormat());
fieldInfo.setNodeId(streamField.getOriginNodeName());
return fieldInfo;
}
@@ -112,7 +87,7 @@ public class FieldInfoUtils {
// Set source field info list.
for (StreamField field : streamFieldList) {
FieldInfo sourceField = getFieldInfo(field.getFieldName(), field.getFieldType(),
- field.getIsMetaField() == 1, field.getFieldFormat());
+ field.getIsMetaField() == 1, field.getMetaFieldName(), field.getFieldFormat());
sourceFields.add(sourceField);
}
@@ -120,11 +95,12 @@ public class FieldInfoUtils {
// Get sink field info list, if the field name equals to build-in field, new a build-in field info
for (SinkField field : fieldList) {
FieldInfo sinkField = getFieldInfo(field.getFieldName(), field.getFieldType(),
- field.getIsMetaField() == 1, field.getFieldFormat());
+ field.getIsMetaField() == 1, field.getMetaFieldName(), field.getFieldFormat());
sinkFields.add(sinkField);
if (StringUtils.isNotBlank(field.getSourceFieldName())) {
FieldInfo sourceField = getFieldInfo(field.getSourceFieldName(),
- field.getSourceFieldType(), field.getIsMetaField() == 1, field.getFieldFormat());
+ field.getSourceFieldType(), field.getIsMetaField() == 1,
+ field.getMetaFieldName(), field.getFieldFormat());
mappingUnitList.add(new FieldMappingUnit(sourceField, sinkField));
}
}
@@ -137,27 +113,13 @@ public class FieldInfoUtils {
*
* @apiNote If the field name equals to build-in field, new a build-in field info
*/
- private static FieldInfo getFieldInfo(String fieldName, String fieldType, boolean isBuiltin, String format) {
- BuiltInField builtInField = BUILT_IN_FIELD_MAP.get(fieldName);
- FormatInfo formatInfo = convertFieldFormat(fieldType.toLowerCase(), format);
- if (isBuiltin && builtInField != null) {
- return new BuiltInFieldInfo(fieldName, formatInfo, builtInField);
+ private static FieldInfo getFieldInfo(String fieldName, String fieldType,
+ boolean isMetaField, String metaFieldName, String format) {
+ if (isMetaField) {
+ // TODO The meta field needs to be selectable and cannot be filled in by the user
+ return new MetaFieldInfo(fieldName, MetaField.forName(metaFieldName));
} else {
- if (isBuiltin) {
- // Check if fieldName contains buildInFieldName, such as left_database
- // TODO The buildin field needs to be selectable and cannot be filled in by the user
- for (String buildInFieldName : BUILT_IN_FIELD_MAP.keySet()) {
- if (fieldName.contains(buildInFieldName)) {
- builtInField = BUILT_IN_FIELD_MAP.get(buildInFieldName);
- break;
- }
- }
- if (builtInField != null) {
- return new BuiltInFieldInfo(fieldName, formatInfo, builtInField);
- }
- log.warn("Unsupported metadata fieldName={} as the builtInField is null", fieldName);
- }
- return new FieldInfo(fieldName, formatInfo);
+ return new FieldInfo(fieldName, convertFieldFormat(fieldType.toLowerCase(), format));
}
}
@@ -167,18 +129,13 @@ public class FieldInfoUtils {
public static List<FieldMappingUnit> setAllMigrationFieldMapping(List<FieldInfo> sourceFields,
List<FieldInfo> sinkFields) {
List<FieldMappingUnit> mappingUnitList = new ArrayList<>();
- BuiltInFieldInfo dataField = new BuiltInFieldInfo("data", StringFormatInfo.INSTANCE,
- BuiltInField.MYSQL_METADATA_DATA);
+ MetaFieldInfo dataField = new MetaFieldInfo("data", MetaField.DATA);
sourceFields.add(dataField);
sinkFields.add(dataField);
mappingUnitList.add(new FieldMappingUnit(dataField, dataField));
-
- for (Map.Entry<String, BuiltInField> entry : BUILT_IN_FIELD_MAP.entrySet()) {
- if (entry.getKey().equals("data_time")) {
- continue;
- }
- BuiltInFieldInfo fieldInfo = new BuiltInFieldInfo(entry.getKey(),
- StringFormatInfo.INSTANCE, entry.getValue());
+ // TODO discarded later
+ for (MetaField metaField : MetaField.values()) {
+ MetaFieldInfo fieldInfo = new MetaFieldInfo(metaField.name(), metaField);
sourceFields.add(fieldInfo);
sinkFields.add(fieldInfo);
mappingUnitList.add(new FieldMappingUnit(fieldInfo, fieldInfo));
diff --git a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
index 13ad0d454..e20437a39 100644
--- a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
@@ -379,6 +379,7 @@ CREATE TABLE IF NOT EXISTS `inlong_stream_field`
`field_type` varchar(20) NOT NULL COMMENT 'field type',
`field_comment` varchar(50) DEFAULT NULL COMMENT 'Field description',
`is_meta_field` smallint(3) DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
+ `meta_field_name` varchar(20) DEFAULT NULL COMMENT 'Meta field name',
`field_format` varchar(50) DEFAULT NULL COMMENT 'Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such as yyyy-MM-dd HH:mm:ss',
`rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order (front-end display field order)',
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
@@ -595,6 +596,7 @@ CREATE TABLE IF NOT EXISTS `stream_source_field`
`field_type` varchar(20) NOT NULL COMMENT 'field type',
`field_comment` varchar(50) DEFAULT NULL COMMENT 'Field description',
`is_meta_field` smallint(3) DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
+ `meta_field_name` varchar(20) DEFAULT NULL COMMENT 'Meta field name',
`field_format` varchar(50) DEFAULT NULL COMMENT 'Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such as yyyy-MM-dd HH:mm:ss',
`rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order (front-end display field order)',
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
@@ -619,6 +621,7 @@ CREATE TABLE IF NOT EXISTS `stream_transform_field`
`field_type` varchar(50) NOT NULL COMMENT 'Field type',
`field_comment` varchar(2000) DEFAULT NULL COMMENT 'Field description',
`is_meta_field` smallint(3) DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
+ `meta_field_name` varchar(20) DEFAULT NULL COMMENT 'Meta field name',
`field_format` varchar(50) DEFAULT NULL COMMENT 'Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such as yyyy-MM-dd HH:mm:ss',
`rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order (front-end display field order)',
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
@@ -646,6 +649,7 @@ CREATE TABLE IF NOT EXISTS `stream_sink_field`
`field_comment` varchar(2000) DEFAULT NULL COMMENT 'Field description',
`ext_params` text COMMENT 'Field ext params',
`is_meta_field` smallint(3) DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
+ `meta_field_name` varchar(20) DEFAULT NULL COMMENT 'Meta field name',
`field_format` varchar(50) DEFAULT NULL COMMENT 'Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such as yyyy-MM-dd HH:mm:ss',
`rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order (front-end display field order)',
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index b0e06ec02..fe68bba78 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -405,6 +405,7 @@ CREATE TABLE IF NOT EXISTS `inlong_stream_field`
`field_type` varchar(20) NOT NULL COMMENT 'field type',
`field_comment` varchar(50) DEFAULT NULL COMMENT 'Field description',
`is_meta_field` smallint(3) DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
+ `meta_field_name` varchar(20) DEFAULT NULL COMMENT 'Meta field name',
`field_format` varchar(50) DEFAULT NULL COMMENT 'Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such as yyyy-MM-dd HH:mm:ss',
`rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order (front-end display field order)',
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
@@ -630,6 +631,7 @@ CREATE TABLE IF NOT EXISTS `stream_source_field`
`field_type` varchar(20) NOT NULL COMMENT 'field type',
`field_comment` varchar(50) DEFAULT NULL COMMENT 'Field description',
`is_meta_field` smallint(3) DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
+ `meta_field_name` varchar(20) DEFAULT NULL COMMENT 'Meta field name',
`field_format` varchar(50) DEFAULT NULL COMMENT 'Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such as yyyy-MM-dd HH:mm:ss',
`rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order (front-end display field order)',
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
@@ -654,6 +656,7 @@ CREATE TABLE IF NOT EXISTS `stream_transform_field`
`field_type` varchar(50) NOT NULL COMMENT 'Field type',
`field_comment` varchar(2000) DEFAULT NULL COMMENT 'Field description',
`is_meta_field` smallint(3) DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
+ `meta_field_name` varchar(20) DEFAULT NULL COMMENT 'Meta field name',
`field_format` varchar(50) DEFAULT NULL COMMENT 'Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such as yyyy-MM-dd HH:mm:ss',
`rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order (front-end display field order)',
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
@@ -682,6 +685,7 @@ CREATE TABLE IF NOT EXISTS `stream_sink_field`
`field_comment` varchar(2000) DEFAULT NULL COMMENT 'Field description',
`ext_params` text COMMENT 'Field ext params',
`is_meta_field` smallint(3) DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
+ `meta_field_name` varchar(20) DEFAULT NULL COMMENT 'Meta field name',
`field_format` varchar(50) DEFAULT NULL COMMENT 'Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such as yyyy-MM-dd HH:mm:ss',
`rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order (front-end display field order)',
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/BuiltInFieldInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/BuiltInFieldInfo.java
deleted file mode 100644
index d1506ad54..000000000
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/BuiltInFieldInfo.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.protocol;
-
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.inlong.sort.formats.common.FormatInfo;
-
-/**
- * built-in field info.
- */
-@Deprecated
-public class BuiltInFieldInfo extends FieldInfo {
-
- private static final long serialVersionUID = -3436204467879205139L;
-
- @JsonProperty("builtinField")
- private final BuiltInField builtInField;
-
- @JsonCreator
- public BuiltInFieldInfo(
- @JsonProperty("name") String name,
- @JsonProperty("nodeId") String nodeId,
- @JsonProperty("formatInfo") FormatInfo formatInfo,
- @JsonProperty("builtinField") BuiltInField builtInField) {
- super(name, nodeId, formatInfo);
- this.builtInField = builtInField;
- }
-
- public BuiltInFieldInfo(
- @JsonProperty("name") String name,
- @JsonProperty("formatInfo") FormatInfo formatInfo,
- @JsonProperty("builtinField") BuiltInField builtInField) {
- super(name, formatInfo);
- this.builtInField = builtInField;
- }
-
- @JsonProperty("builtinField")
- public BuiltInField getBuiltInField() {
- return builtInField;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- if (!super.equals(o)) {
- return false;
- }
- BuiltInFieldInfo that = (BuiltInFieldInfo) o;
- return builtInField == that.builtInField
- && super.equals(that);
- }
-
- public enum BuiltInField {
- /**
- * The event time of flink
- */
- @Deprecated
- DATA_TIME,
- /**
- * The process time of flink
- */
- PROCESS_TIME,
- /**
- * The name of the database containing this Row
- * It is deprecated and can be replaced by ${@link BuiltInField#DATABASE_NAME}
- * and will be removed in a future version.
- */
- @Deprecated
- MYSQL_METADATA_DATABASE,
- /**
- * The name of the table containing this Row
- * It is deprecated and can be replaced by ${@link BuiltInField#TABLE_NAME}
- * and will be removed in a future version.
- */
- @Deprecated
- MYSQL_METADATA_TABLE,
- /**
- * The time when the Row made changes in the database.
- * It is deprecated and can be replaced by ${@link BuiltInField#OP_TS}
- * and will be removed in a future version.
- */
- @Deprecated
- MYSQL_METADATA_EVENT_TIME,
- /**
- * Name of the schema that contain the row.
- */
- SCHEMA_NAME,
- /**
- * Name of the database that contain the row.
- */
- DATABASE_NAME,
- /**
- * Name of the table that contain the row.
- */
- TABLE_NAME,
- /**
- * It indicates the time that the change was made in the database.
- * If the record is read from snapshot of the table instead of the change stream, the value is always 0
- */
- OP_TS,
- /**
- * Whether the DDL statement
- */
- IS_DDL,
- /**
- * Whether the DDL statement
- * It is deprecated and can be replaced by ${@link BuiltInField#IS_DDL}
- * and will be removed in a future version.
- */
- @Deprecated
- MYSQL_METADATA_IS_DDL,
- /**
- * Type of database operation, such as INSERT/DELETE, etc.
- */
- OP_TYPE,
- /**
- * Type of database operation, such as INSERT/DELETE, etc.
- * It is deprecated and can be replaced by ${@link BuiltInField#OP_TYPE}
- * and will be removed in a future version.
- */
- @Deprecated
- MYSQL_METADATA_EVENT_TYPE,
- /**
- * MySQL binlog data Row
- */
- MYSQL_METADATA_DATA,
- /**
- * The value of the field before update
- */
- METADATA_UPDATE_BEFORE,
- /**
- * Batch id of binlog
- */
- METADATA_BATCH_ID,
- /**
- * Mapping of sql_type table fields to java data type IDs
- */
- METADATA_SQL_TYPE,
- /**
- * The current time when the ROW was received and processed
- */
- METADATA_TS,
- /**
- * The table structure
- */
- METADATA_MYSQL_TYPE,
- /**
- * Primary key field name
- */
- METADATA_PK_NAMES
- }
-}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/FieldInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/FieldInfo.java
index 453a049b0..3ff8a2659 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/FieldInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/FieldInfo.java
@@ -38,9 +38,8 @@ import java.io.Serializable;
include = JsonTypeInfo.As.PROPERTY,
property = "type")
@JsonSubTypes({
- @JsonSubTypes.Type(value = FieldInfo.class, name = "base"),
- @JsonSubTypes.Type(value = MetaFieldInfo.class, name = "metaField"),
- @JsonSubTypes.Type(value = BuiltInFieldInfo.class, name = "builtin")
+ @JsonSubTypes.Type(value = FieldInfo.class, name = "field"),
+ @JsonSubTypes.Type(value = MetaFieldInfo.class, name = "metaField")
})
@Data
public class FieldInfo implements FunctionParam, Serializable {
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/MetaFieldInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/MetaFieldInfo.java
index bd3d389d6..4bfac5512 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/MetaFieldInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/MetaFieldInfo.java
@@ -122,6 +122,15 @@ public class MetaFieldInfo extends FieldInfo {
/**
* Primary key field name. Currently, it is used for MySQL database.
*/
- PK_NAMES
+ PK_NAMES;
+
+ public static MetaField forName(String name) {
+ for (MetaField metaField : values()) {
+ if (metaField.name().equals(name)) {
+ return metaField;
+ }
+ }
+ throw new UnsupportedOperationException(String.format("Unsupported MetaField=%s for Inlong", name));
+ }
}
}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java
index b1ef83e54..f72a9d128 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java
@@ -20,7 +20,6 @@ package org.apache.inlong.sort.protocol.transformation;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.MetaFieldInfo;
import org.apache.inlong.sort.protocol.transformation.function.CascadeFunctionWrapper;
@@ -62,8 +61,7 @@ import org.apache.inlong.sort.protocol.transformation.operator.OrOperator;
include = JsonTypeInfo.As.PROPERTY,
property = "type")
@JsonSubTypes({
- @JsonSubTypes.Type(value = FieldInfo.class, name = "base"),
- @JsonSubTypes.Type(value = BuiltInFieldInfo.class, name = "builtin"),
+ @JsonSubTypes.Type(value = FieldInfo.class, name = "field"),
@JsonSubTypes.Type(value = MetaFieldInfo.class, name = "metaField"),
@JsonSubTypes.Type(value = ConstantParam.class, name = "constant"),
@JsonSubTypes.Type(value = TimeUnitConstantParam.class, name = "timeUnitConstant"),
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/BuiltInFieldInfoTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/BuiltInFieldInfoTest.java
deleted file mode 100644
index 423a64de4..000000000
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/BuiltInFieldInfoTest.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.protocol;
-
-import org.apache.inlong.sort.SerializeBaseTest;
-import org.apache.inlong.sort.formats.common.StringFormatInfo;
-
-/**
- * Test for {@link BuiltInFieldInfo}
- */
-public class BuiltInFieldInfoTest extends SerializeBaseTest<BuiltInFieldInfo> {
-
- @Override
- public BuiltInFieldInfo getTestObject() {
- return new BuiltInFieldInfo("f1", StringFormatInfo.INSTANCE, BuiltInFieldInfo.BuiltInField.DATA_TIME);
- }
-}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/FieldInfoTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/FieldInfoTest.java
index e2ed364ec..1e2553eb4 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/FieldInfoTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/FieldInfoTest.java
@@ -43,7 +43,7 @@ public class FieldInfoTest extends SerializeBaseTest<FieldInfo> {
FieldInfo fieldInfo = new FieldInfo("field_name", StringFormatInfo.INSTANCE);
fieldInfo.setNodeId("1");
ObjectMapper objectMapper = new ObjectMapper();
- String fieldInfoStr = "{\"type\":\"base\",\"name\":\"field_name\","
+ String fieldInfoStr = "{\"type\":\"field\",\"name\":\"field_name\","
+ "\"formatInfo\":{\"type\":\"string\"},\"nodeId\":\"1\"}";
FieldInfo expected = objectMapper.readValue(fieldInfoStr, FieldInfo.class);
assertEquals(expected, fieldInfo);
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
index 8ef086c6e..7046bcc24 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
@@ -25,8 +25,6 @@ import org.apache.inlong.sort.function.RegexpReplaceFirstFunction;
import org.apache.inlong.sort.parser.Parser;
import org.apache.inlong.sort.parser.result.FlinkSqlParseResult;
import org.apache.inlong.sort.parser.result.ParseResult;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.GroupInfo;
import org.apache.inlong.sort.protocol.MetaFieldInfo;
@@ -670,9 +668,6 @@ public class FlinkSqlParser implements Parser {
if (field instanceof MetaFieldInfo) {
MetaFieldInfo metaFieldInfo = (MetaFieldInfo) field;
parseMetaField(node, metaFieldInfo, sb);
- } else if (field instanceof BuiltInFieldInfo) {
- BuiltInFieldInfo builtInFieldInfo = (BuiltInFieldInfo) field;
- parseMetaField(node, builtInFieldInfo, sb);
} else {
sb.append(TableFormatUtils.deriveLogicalType(field.getFormatInfo()).asSummaryString());
}
@@ -684,27 +679,6 @@ public class FlinkSqlParser implements Parser {
return sb.toString();
}
- @Deprecated
- private void parseMetaField(Node node, BuiltInFieldInfo metaField, StringBuilder sb) {
- if (metaField.getBuiltInField() == BuiltInField.PROCESS_TIME) {
- sb.append(" AS PROCTIME()");
- return;
- }
- if (node instanceof MySqlExtractNode) {
- sb.append(parseMySqlExtractNodeMetaField(metaField));
- } else if (node instanceof OracleExtractNode) {
- sb.append(parseOracleExtractNodeMetaField(metaField));
- } else if (node instanceof KafkaExtractNode) {
- sb.append(parseKafkaExtractNodeMetaField(metaField));
- } else if (node instanceof KafkaLoadNode) {
- sb.append(parseKafkaLoadNodeMetaField(metaField));
- } else {
- throw new UnsupportedOperationException(
- String.format("This node:%s does not currently support metadata fields",
- node.getClass().getName()));
- }
- }
-
private void parseMetaField(Node node, MetaFieldInfo metaFieldInfo, StringBuilder sb) {
if (metaFieldInfo.getMetaField() == MetaFieldInfo.MetaField.PROCESS_TIME) {
sb.append(" AS PROCTIME()");
@@ -725,58 +699,6 @@ public class FlinkSqlParser implements Parser {
}
}
- @Deprecated
- private String parseKafkaLoadNodeMetaField(BuiltInFieldInfo metaField) {
- String metaType;
- switch (metaField.getBuiltInField()) {
- case MYSQL_METADATA_TABLE:
- case TABLE_NAME:
- metaType = "STRING METADATA FROM 'value.table'";
- break;
- case MYSQL_METADATA_DATABASE:
- case DATABASE_NAME:
- metaType = "STRING METADATA FROM 'value.database'";
- break;
- case MYSQL_METADATA_EVENT_TIME:
- case OP_TS:
- metaType = "TIMESTAMP(3) METADATA FROM 'value.event-timestamp'";
- break;
- case MYSQL_METADATA_EVENT_TYPE:
- case OP_TYPE:
- metaType = "STRING METADATA FROM 'value.op-type'";
- break;
- case MYSQL_METADATA_DATA:
- metaType = "STRING METADATA FROM 'value.data'";
- break;
- case MYSQL_METADATA_IS_DDL:
- case IS_DDL:
- metaType = "BOOLEAN METADATA FROM 'value.is-ddl'";
- break;
- case METADATA_TS:
- metaType = "TIMESTAMP_LTZ(3) METADATA FROM 'value.ingestion-timestamp'";
- break;
- case METADATA_SQL_TYPE:
- metaType = "MAP<STRING, INT> METADATA FROM 'value.sql-type'";
- break;
- case METADATA_MYSQL_TYPE:
- metaType = "MAP<STRING, STRING> METADATA FROM 'value.mysql-type'";
- break;
- case METADATA_PK_NAMES:
- metaType = "ARRAY<STRING> METADATA FROM 'value.pk-names'";
- break;
- case METADATA_BATCH_ID:
- metaType = "BIGINT METADATA FROM 'value.batch-id'";
- break;
- case METADATA_UPDATE_BEFORE:
- metaType = "ARRAY<MAP<STRING, STRING>> METADATA FROM 'value.update-before'";
- break;
- default:
- throw new UnsupportedOperationException(String.format("Unsupport meta field: %s",
- metaField.getBuiltInField()));
- }
- return metaType;
- }
-
private String parseKafkaLoadNodeMetaField(MetaFieldInfo metaFieldInfo) {
String metaType;
switch (metaFieldInfo.getMetaField()) {
@@ -823,56 +745,6 @@ public class FlinkSqlParser implements Parser {
return metaType;
}
- @Deprecated
- private String parseKafkaExtractNodeMetaField(BuiltInFieldInfo metaField) {
- String metaType;
- switch (metaField.getBuiltInField()) {
- case MYSQL_METADATA_TABLE:
- case TABLE_NAME:
- metaType = "STRING METADATA FROM 'value.table'";
- break;
- case MYSQL_METADATA_DATABASE:
- case DATABASE_NAME:
- metaType = "STRING METADATA FROM 'value.database'";
- break;
- case METADATA_SQL_TYPE:
- metaType = "MAP<STRING, INT> METADATA FROM 'value.sql-type'";
- break;
- case METADATA_PK_NAMES:
- metaType = "ARRAY<STRING> METADATA FROM 'value.pk-names'";
- break;
- case METADATA_TS:
- metaType = "TIMESTAMP_LTZ(3) METADATA FROM 'value.ingestion-timestamp'";
- break;
- case MYSQL_METADATA_EVENT_TIME:
- case OP_TS:
- metaType = "TIMESTAMP_LTZ(3) METADATA FROM 'value.event-timestamp'";
- break;
- // additional metadata
- case MYSQL_METADATA_EVENT_TYPE:
- case OP_TYPE:
- metaType = "STRING METADATA FROM 'value.op-type'";
- break;
- case MYSQL_METADATA_IS_DDL:
- case IS_DDL:
- metaType = "BOOLEAN METADATA FROM 'value.is-ddl'";
- break;
- case METADATA_MYSQL_TYPE:
- metaType = "MAP<STRING, STRING> METADATA FROM 'value.mysql-type'";
- break;
- case METADATA_BATCH_ID:
- metaType = "BIGINT METADATA FROM 'value.batch-id'";
- break;
- case METADATA_UPDATE_BEFORE:
- metaType = "ARRAY<MAP<STRING, STRING>> METADATA FROM 'value.update-before'";
- break;
- default:
- throw new UnsupportedOperationException(String.format("Unsupport meta field: %s",
- metaField.getBuiltInField()));
- }
- return metaType;
- }
-
private String parseKafkaExtractNodeMetaField(MetaFieldInfo metaFieldInfo) {
String metaType;
switch (metaFieldInfo.getMetaField()) {
@@ -963,80 +835,6 @@ public class FlinkSqlParser implements Parser {
return metaType;
}
- private String parseMySqlExtractNodeMetaField(BuiltInFieldInfo metaField) {
- String metaType;
- switch (metaField.getBuiltInField()) {
- case MYSQL_METADATA_TABLE:
- case TABLE_NAME:
- metaType = "STRING METADATA FROM 'meta.table_name' VIRTUAL";
- break;
- case MYSQL_METADATA_DATABASE:
- case DATABASE_NAME:
- metaType = "STRING METADATA FROM 'meta.database_name' VIRTUAL";
- break;
- case MYSQL_METADATA_EVENT_TIME:
- case OP_TS:
- metaType = "TIMESTAMP(3) METADATA FROM 'meta.op_ts' VIRTUAL";
- break;
- case MYSQL_METADATA_EVENT_TYPE:
- case OP_TYPE:
- metaType = "STRING METADATA FROM 'meta.op_type' VIRTUAL";
- break;
- case MYSQL_METADATA_DATA:
- metaType = "STRING METADATA FROM 'meta.data' VIRTUAL";
- break;
- case MYSQL_METADATA_IS_DDL:
- case IS_DDL:
- metaType = "BOOLEAN METADATA FROM 'meta.is_ddl' VIRTUAL";
- break;
- case METADATA_TS:
- metaType = "TIMESTAMP_LTZ(3) METADATA FROM 'meta.ts' VIRTUAL";
- break;
- case METADATA_SQL_TYPE:
- metaType = "MAP<STRING, INT> METADATA FROM 'meta.sql_type' VIRTUAL";
- break;
- case METADATA_MYSQL_TYPE:
- metaType = "MAP<STRING, STRING> METADATA FROM 'meta.mysql_type' VIRTUAL";
- break;
- case METADATA_PK_NAMES:
- metaType = "ARRAY<STRING> METADATA FROM 'meta.pk_names' VIRTUAL";
- break;
- case METADATA_BATCH_ID:
- metaType = "BIGINT METADATA FROM 'meta.batch_id' VIRTUAL";
- break;
- case METADATA_UPDATE_BEFORE:
- metaType = "ARRAY<MAP<STRING, STRING>> METADATA FROM 'meta.update_before' VIRTUAL";
- break;
- default:
- throw new UnsupportedOperationException(String.format("Unsupport meta field: %s",
- metaField.getBuiltInField()));
- }
- return metaType;
- }
-
- @Deprecated
- private String parseOracleExtractNodeMetaField(BuiltInFieldInfo metaField) {
- String metaType;
- switch (metaField.getBuiltInField()) {
- case TABLE_NAME:
- metaType = "STRING METADATA FROM 'table_name' VIRTUAL";
- break;
- case SCHEMA_NAME:
- metaType = "STRING METADATA FROM 'schema_name' VIRTUAL";
- break;
- case DATABASE_NAME:
- metaType = "STRING METADATA FROM 'database_name' VIRTUAL";
- break;
- case OP_TS:
- metaType = "TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL";
- break;
- default:
- throw new UnsupportedOperationException(String.format("Unsupport meta field: %s",
- metaField.getBuiltInField()));
- }
- return metaType;
- }
-
private String parseOracleExtractNodeMetaField(MetaFieldInfo metaFieldInfo) {
String metaType;
switch (metaFieldInfo.getMetaField()) {
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
index 3a6057517..26892eb69 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
@@ -20,12 +20,12 @@ package org.apache.inlong.sort.parser;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
import org.apache.inlong.sort.parser.result.ParseResult;
-import org.apache.inlong.sort.formats.common.StringFormatInfo;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.MetaFieldInfo;
import org.apache.inlong.sort.protocol.StreamInfo;
import org.apache.inlong.sort.protocol.node.Node;
import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
@@ -43,13 +43,11 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-import static org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField.MYSQL_METADATA_DATA;
-
public class AllMigrateTest {
private MySqlExtractNode buildAllMigrateExtractNode() {
List<FieldInfo> fields = Arrays.asList(
- new BuiltInFieldInfo("data", new StringFormatInfo(), MYSQL_METADATA_DATA));
+ new MetaFieldInfo("data", MetaFieldInfo.MetaField.DATA));
Map<String, String> option = new HashMap<>();
option.put("append-mode", "true");
option.put("migrate-all", "true");
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DistinctNodeSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DistinctNodeSqlParseTest.java
index bc538e3dd..f889caf50 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DistinctNodeSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DistinctNodeSqlParseTest.java
@@ -21,17 +21,16 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.test.util.AbstractTestBase;
-import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
-import org.apache.inlong.sort.parser.result.ParseResult;
import org.apache.inlong.sort.formats.common.FloatFormatInfo;
import org.apache.inlong.sort.formats.common.IntFormatInfo;
import org.apache.inlong.sort.formats.common.LongFormatInfo;
import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField;
+import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
+import org.apache.inlong.sort.parser.result.ParseResult;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.MetaFieldInfo;
import org.apache.inlong.sort.protocol.StreamInfo;
import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
import org.apache.inlong.sort.protocol.node.Node;
@@ -65,7 +64,7 @@ public class DistinctNodeSqlParseTest extends AbstractTestBase {
new FieldInfo("age", new IntFormatInfo()),
new FieldInfo("salary", new FloatFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo()),
- new BuiltInFieldInfo("proctime", new TimestampFormatInfo(), BuiltInField.PROCESS_TIME));
+ new MetaFieldInfo("proctime", MetaFieldInfo.MetaField.PROCESS_TIME));
return new KafkaExtractNode("1", "kafka_input", fields, null,
null, "topic_input", "localhost:9092",
new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET,
@@ -78,7 +77,7 @@ public class DistinctNodeSqlParseTest extends AbstractTestBase {
new FieldInfo("age", new IntFormatInfo()),
new FieldInfo("salary", new FloatFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo()),
- new BuiltInFieldInfo("proctime", new TimestampFormatInfo(), BuiltInField.PROCESS_TIME));
+ new MetaFieldInfo("proctime", MetaFieldInfo.MetaField.PROCESS_TIME));
WatermarkField wk = new WatermarkField(new FieldInfo("ts", new TimestampFormatInfo()),
new StringConstantParam("1"),
new TimeUnitConstantParam(TimeUnit.SECOND));
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MetaFieldSyncTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MetaFieldSyncTest.java
index 045fba0c7..eb71f10c4 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MetaFieldSyncTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MetaFieldSyncTest.java
@@ -21,20 +21,16 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.test.util.AbstractTestBase;
-import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
-import org.apache.inlong.sort.parser.result.ParseResult;
-import org.apache.inlong.sort.formats.common.ArrayFormatInfo;
-import org.apache.inlong.sort.formats.common.BooleanFormatInfo;
import org.apache.inlong.sort.formats.common.FloatFormatInfo;
import org.apache.inlong.sort.formats.common.IntFormatInfo;
import org.apache.inlong.sort.formats.common.LongFormatInfo;
-import org.apache.inlong.sort.formats.common.MapFormatInfo;
import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField;
+import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
+import org.apache.inlong.sort.parser.result.ParseResult;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.MetaFieldInfo;
import org.apache.inlong.sort.protocol.StreamInfo;
import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
import org.apache.inlong.sort.protocol.node.Node;
@@ -63,27 +59,17 @@ public class MetaFieldSyncTest extends AbstractTestBase {
new FieldInfo("age", new IntFormatInfo()),
new FieldInfo("salary", new FloatFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo()),
- new BuiltInFieldInfo("database", new StringFormatInfo(),
- BuiltInField.MYSQL_METADATA_DATABASE),
- new BuiltInFieldInfo("table", new StringFormatInfo(),
- BuiltInField.MYSQL_METADATA_TABLE),
- new BuiltInFieldInfo("pk_names", new ArrayFormatInfo(new StringFormatInfo()),
- BuiltInField.METADATA_PK_NAMES),
- new BuiltInFieldInfo("event_time", new TimestampFormatInfo(),
- BuiltInField.MYSQL_METADATA_EVENT_TIME),
- new BuiltInFieldInfo("event_type", new StringFormatInfo(),
- BuiltInField.MYSQL_METADATA_EVENT_TYPE),
- new BuiltInFieldInfo("isddl", new BooleanFormatInfo(),
- BuiltInField.MYSQL_METADATA_IS_DDL),
- new BuiltInFieldInfo("batch_id", new LongFormatInfo(),
- BuiltInField.METADATA_BATCH_ID),
- new BuiltInFieldInfo("mysql_type", new MapFormatInfo(new StringFormatInfo(),
- new StringFormatInfo()), BuiltInField.METADATA_MYSQL_TYPE),
- new BuiltInFieldInfo("sql_type", new MapFormatInfo(new StringFormatInfo(),
- new IntFormatInfo()), BuiltInField.METADATA_SQL_TYPE),
- new BuiltInFieldInfo("meta_ts", new TimestampFormatInfo(), BuiltInField.METADATA_TS),
- new BuiltInFieldInfo("up_before", new MapFormatInfo(new StringFormatInfo(),
- new StringFormatInfo()), BuiltInField.METADATA_UPDATE_BEFORE)
+ new MetaFieldInfo("database", MetaFieldInfo.MetaField.DATABASE_NAME),
+ new MetaFieldInfo("table", MetaFieldInfo.MetaField.TABLE_NAME),
+ new MetaFieldInfo("pk_names", MetaFieldInfo.MetaField.PK_NAMES),
+ new MetaFieldInfo("event_time", MetaFieldInfo.MetaField.OP_TS),
+ new MetaFieldInfo("event_type", MetaFieldInfo.MetaField.OP_TYPE),
+ new MetaFieldInfo("isddl", MetaFieldInfo.MetaField.IS_DDL),
+ new MetaFieldInfo("batch_id", MetaFieldInfo.MetaField.BATCH_ID),
+ new MetaFieldInfo("mysql_type", MetaFieldInfo.MetaField.MYSQL_TYPE),
+ new MetaFieldInfo("sql_type", MetaFieldInfo.MetaField.SQL_TYPE),
+ new MetaFieldInfo("meta_ts", MetaFieldInfo.MetaField.TS),
+ new MetaFieldInfo("up_before", MetaFieldInfo.MetaField.UPDATE_BEFORE)
);
return new MySqlExtractNode("1", "mysql_input", fields, null, null,
"id", Collections.singletonList("mysql_table"),
@@ -97,27 +83,17 @@ public class MetaFieldSyncTest extends AbstractTestBase {
new FieldInfo("age", new IntFormatInfo()),
new FieldInfo("salary", new FloatFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo()),
- new BuiltInFieldInfo("database", new StringFormatInfo(),
- BuiltInField.MYSQL_METADATA_DATABASE),
- new BuiltInFieldInfo("table", new StringFormatInfo(),
- BuiltInField.MYSQL_METADATA_TABLE),
- new BuiltInFieldInfo("pk_names", new ArrayFormatInfo(new StringFormatInfo()),
- BuiltInField.METADATA_PK_NAMES),
- new BuiltInFieldInfo("event_time", new TimestampFormatInfo(),
- BuiltInField.MYSQL_METADATA_EVENT_TIME),
- new BuiltInFieldInfo("event_type", new StringFormatInfo(),
- BuiltInField.MYSQL_METADATA_EVENT_TYPE),
- new BuiltInFieldInfo("isddl", new BooleanFormatInfo(),
- BuiltInField.MYSQL_METADATA_IS_DDL),
- new BuiltInFieldInfo("batch_id", new LongFormatInfo(),
- BuiltInField.METADATA_BATCH_ID),
- new BuiltInFieldInfo("mysql_type", new MapFormatInfo(new StringFormatInfo(),
- new StringFormatInfo()), BuiltInField.METADATA_MYSQL_TYPE),
- new BuiltInFieldInfo("sql_type", new MapFormatInfo(new StringFormatInfo(),
- new IntFormatInfo()), BuiltInField.METADATA_SQL_TYPE),
- new BuiltInFieldInfo("meta_ts", new TimestampFormatInfo(), BuiltInField.METADATA_TS),
- new BuiltInFieldInfo("up_before", new MapFormatInfo(new StringFormatInfo(),
- new StringFormatInfo()), BuiltInField.METADATA_UPDATE_BEFORE)
+ new MetaFieldInfo("database", MetaFieldInfo.MetaField.DATABASE_NAME),
+ new MetaFieldInfo("table", MetaFieldInfo.MetaField.TABLE_NAME),
+ new MetaFieldInfo("pk_names", MetaFieldInfo.MetaField.PK_NAMES),
+ new MetaFieldInfo("event_time", MetaFieldInfo.MetaField.OP_TS),
+ new MetaFieldInfo("event_type", MetaFieldInfo.MetaField.OP_TYPE),
+ new MetaFieldInfo("isddl", MetaFieldInfo.MetaField.IS_DDL),
+ new MetaFieldInfo("batch_id", MetaFieldInfo.MetaField.BATCH_ID),
+ new MetaFieldInfo("mysql_type", MetaFieldInfo.MetaField.MYSQL_TYPE),
+ new MetaFieldInfo("sql_type", MetaFieldInfo.MetaField.SQL_TYPE),
+ new MetaFieldInfo("meta_ts", MetaFieldInfo.MetaField.TS),
+ new MetaFieldInfo("up_before", MetaFieldInfo.MetaField.UPDATE_BEFORE)
);
List<FieldRelation> relations = Arrays
.asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
@@ -163,27 +139,17 @@ public class MetaFieldSyncTest extends AbstractTestBase {
new FieldInfo("age", new IntFormatInfo()),
new FieldInfo("salary", new FloatFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo()),
- new BuiltInFieldInfo("database", new StringFormatInfo(),
- BuiltInField.MYSQL_METADATA_DATABASE),
- new BuiltInFieldInfo("table", new StringFormatInfo(),
- BuiltInField.MYSQL_METADATA_TABLE),
- new BuiltInFieldInfo("pk_names", new ArrayFormatInfo(new StringFormatInfo()),
- BuiltInField.METADATA_PK_NAMES),
- new BuiltInFieldInfo("event_time", new TimestampFormatInfo(),
- BuiltInField.MYSQL_METADATA_EVENT_TIME),
- new BuiltInFieldInfo("event_type", new StringFormatInfo(),
- BuiltInField.MYSQL_METADATA_EVENT_TYPE),
- new BuiltInFieldInfo("isddl", new BooleanFormatInfo(),
- BuiltInField.MYSQL_METADATA_IS_DDL),
- new BuiltInFieldInfo("batch_id", new LongFormatInfo(),
- BuiltInField.METADATA_BATCH_ID),
- new BuiltInFieldInfo("mysql_type", new MapFormatInfo(new StringFormatInfo(),
- new StringFormatInfo()), BuiltInField.METADATA_MYSQL_TYPE),
- new BuiltInFieldInfo("sql_type", new MapFormatInfo(new StringFormatInfo(),
- new IntFormatInfo()), BuiltInField.METADATA_SQL_TYPE),
- new BuiltInFieldInfo("meta_ts", new TimestampFormatInfo(), BuiltInField.METADATA_TS),
- new BuiltInFieldInfo("up_before", new MapFormatInfo(new StringFormatInfo(),
- new StringFormatInfo()), BuiltInField.METADATA_UPDATE_BEFORE)
+ new MetaFieldInfo("database", MetaFieldInfo.MetaField.DATABASE_NAME),
+ new MetaFieldInfo("table", MetaFieldInfo.MetaField.TABLE_NAME),
+ new MetaFieldInfo("pk_names", MetaFieldInfo.MetaField.PK_NAMES),
+ new MetaFieldInfo("event_time", MetaFieldInfo.MetaField.OP_TS),
+ new MetaFieldInfo("event_type", MetaFieldInfo.MetaField.OP_TYPE),
+ new MetaFieldInfo("isddl", MetaFieldInfo.MetaField.IS_DDL),
+ new MetaFieldInfo("batch_id", MetaFieldInfo.MetaField.BATCH_ID),
+ new MetaFieldInfo("mysql_type", MetaFieldInfo.MetaField.MYSQL_TYPE),
+ new MetaFieldInfo("sql_type", MetaFieldInfo.MetaField.SQL_TYPE),
+ new MetaFieldInfo("meta_ts", MetaFieldInfo.MetaField.TS),
+ new MetaFieldInfo("up_before", MetaFieldInfo.MetaField.UPDATE_BEFORE)
);
return new KafkaExtractNode("3", "kafka_input", fields,
null, null, "topic1", "localhost:9092",
@@ -197,27 +163,17 @@ public class MetaFieldSyncTest extends AbstractTestBase {
new FieldInfo("age", new IntFormatInfo()),
new FieldInfo("salary", new FloatFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo()),
- new BuiltInFieldInfo("database", new StringFormatInfo(),
- BuiltInField.MYSQL_METADATA_DATABASE),
- new BuiltInFieldInfo("table", new StringFormatInfo(),
- BuiltInField.MYSQL_METADATA_TABLE),
- new BuiltInFieldInfo("pk_names", new ArrayFormatInfo(new StringFormatInfo()),
- BuiltInField.METADATA_PK_NAMES),
- new BuiltInFieldInfo("event_time", new TimestampFormatInfo(),
- BuiltInField.MYSQL_METADATA_EVENT_TIME),
- new BuiltInFieldInfo("event_type", new StringFormatInfo(),
- BuiltInField.MYSQL_METADATA_EVENT_TYPE),
- new BuiltInFieldInfo("isddl", new BooleanFormatInfo(),
- BuiltInField.MYSQL_METADATA_IS_DDL),
- new BuiltInFieldInfo("batch_id", new LongFormatInfo(),
- BuiltInField.METADATA_BATCH_ID),
- new BuiltInFieldInfo("mysql_type", new MapFormatInfo(new StringFormatInfo(),
- new StringFormatInfo()), BuiltInField.METADATA_MYSQL_TYPE),
- new BuiltInFieldInfo("sql_type", new MapFormatInfo(new StringFormatInfo(),
- new IntFormatInfo()), BuiltInField.METADATA_SQL_TYPE),
- new BuiltInFieldInfo("meta_ts", new TimestampFormatInfo(), BuiltInField.METADATA_TS),
- new BuiltInFieldInfo("up_before", new MapFormatInfo(new StringFormatInfo(),
- new StringFormatInfo()), BuiltInField.METADATA_UPDATE_BEFORE)
+ new MetaFieldInfo("database", MetaFieldInfo.MetaField.DATABASE_NAME),
+ new MetaFieldInfo("table", MetaFieldInfo.MetaField.TABLE_NAME),
+ new MetaFieldInfo("pk_names", MetaFieldInfo.MetaField.PK_NAMES),
+ new MetaFieldInfo("event_time", MetaFieldInfo.MetaField.OP_TS),
+ new MetaFieldInfo("event_type", MetaFieldInfo.MetaField.OP_TYPE),
+ new MetaFieldInfo("isddl", MetaFieldInfo.MetaField.IS_DDL),
+ new MetaFieldInfo("batch_id", MetaFieldInfo.MetaField.BATCH_ID),
+ new MetaFieldInfo("mysql_type", MetaFieldInfo.MetaField.MYSQL_TYPE),
+ new MetaFieldInfo("sql_type", MetaFieldInfo.MetaField.SQL_TYPE),
+ new MetaFieldInfo("meta_ts", MetaFieldInfo.MetaField.TS),
+ new MetaFieldInfo("up_before", MetaFieldInfo.MetaField.UPDATE_BEFORE)
);
List<FieldRelation> relations = Arrays
.asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),