You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/24 08:31:54 UTC
[incubator-inlong] branch master updated: [INLONG-3334][Manager] Get source field list from stream field table for Sort (#3344)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 07f612b [INLONG-3334][Manager] Get source field list from stream field table for Sort (#3344)
07f612b is described below
commit 07f612b9afd972f4cb3d10a3988825ffded985bb
Author: healchow <he...@gmail.com>
AuthorDate: Thu Mar 24 16:31:50 2022 +0800
[INLONG-3334][Manager] Get source field list from stream field table for Sort (#3344)
* [INLONG-3334][Manager] Get source field list from stream field table for Sort
* [INLONG-3334][Manager] Change order by field
* [INLONG-3334][Manager] Change the field comment
---
.../inlong/manager/client/api/SinkField.java | 10 +-
.../inlong/manager/client/api/StreamField.java | 6 +-
.../manager/client/api/impl/InlongStreamImpl.java | 16 +-
.../client/api/util/InlongStreamSinkTransfer.java | 22 +--
.../client/api/util/InlongStreamTransfer.java | 15 +-
.../manager/common/pojo/sink/SinkFieldRequest.java | 4 +-
.../common/pojo/sink/SinkFieldResponse.java | 4 +-
.../common/pojo/stream/InlongStreamFieldInfo.java | 10 +-
.../dao/entity/InlongStreamFieldEntity.java | 4 +-
.../manager/dao/entity/StreamSinkFieldEntity.java | 2 +-
.../dao/mapper/InlongStreamFieldEntityMapper.java | 24 +--
.../mappers/InlongStreamFieldEntityMapper.xml | 209 +++++----------------
.../mappers/StreamSinkFieldEntityMapper.xml | 17 +-
.../manager/service/CommonOperateService.java | 19 +-
.../service/core/impl/AgentServiceImpl.java | 4 +-
.../service/core/impl/InlongStreamServiceImpl.java | 13 +-
.../thirdparty/sort/util/FieldInfoUtils.java | 99 ++++------
.../main/resources/sql/apache_inlong_manager.sql | 27 +--
.../manager-web/sql/apache_inlong_manager.sql | 27 +--
19 files changed, 202 insertions(+), 330 deletions(-)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SinkField.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SinkField.java
index a728a72..ecf520f 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SinkField.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SinkField.java
@@ -20,10 +20,12 @@ package org.apache.inlong.manager.client.api;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
+import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import org.apache.inlong.manager.common.enums.FieldType;
@Data
+@EqualsAndHashCode(callSuper = true)
@NoArgsConstructor
@ApiModel("Sink field configuration")
public class SinkField extends StreamField {
@@ -34,14 +36,10 @@ public class SinkField extends StreamField {
@ApiModelProperty("Source field type")
private String sourceFieldType;
- @ApiModelProperty("Is source meta field, 0: no, 1: yes")
- private Integer isSourceMetaField = 0;
-
public SinkField(int index, FieldType fieldType, String fieldName, String fieldComment,
- String fieldValue, String sourceFieldName, String sourceFieldType, Integer isSourceMetaField) {
- super(index, fieldType, fieldName, fieldComment, fieldValue);
+ String fieldValue, String sourceFieldName, String sourceFieldType, Integer isMetaField) {
+ super(index, fieldType, fieldName, fieldComment, fieldValue, isMetaField);
this.sourceFieldName = sourceFieldName;
this.sourceFieldType = sourceFieldType;
- this.isSourceMetaField = isSourceMetaField;
}
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamField.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamField.java
index 992c500..445c53b 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamField.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamField.java
@@ -31,7 +31,7 @@ import org.apache.inlong.manager.common.enums.FieldType;
public class StreamField {
@ApiModelProperty("Field index")
- private int index;
+ private Integer id;
@ApiModelProperty(value = "Field type", required = true)
private FieldType fieldType;
@@ -44,4 +44,8 @@ public class StreamField {
@ApiModelProperty(value = "Field value for constants")
private String fieldValue;
+
+ @ApiModelProperty("Is this field a meta field, 0: no, 1: yes")
+ private Integer isMetaField = 0;
+
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
index e1b6180..d6a2ad7 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
@@ -60,12 +60,14 @@ public class InlongStreamImpl extends InlongStream {
List<InlongStreamFieldInfo> streamFieldInfos = streamInfo.getFieldList();
if (CollectionUtils.isNotEmpty(streamFieldInfos)) {
this.streamFields = streamFieldInfos.stream()
- .map(streamFieldInfo -> new StreamField(
- streamFieldInfo.getId(),
- FieldType.forName(streamFieldInfo.getFieldType()),
- streamFieldInfo.getFieldName(),
- streamFieldInfo.getFieldComment(),
- streamFieldInfo.getFieldValue())
+ .map(fieldInfo -> new StreamField(
+ fieldInfo.getId(),
+ FieldType.forName(fieldInfo.getFieldType()),
+ fieldInfo.getFieldName(),
+ fieldInfo.getFieldComment(),
+ fieldInfo.getFieldValue(),
+ fieldInfo.getIsMetaField()
+ )
).collect(Collectors.toList());
}
List<SinkResponse> sinkList = fullStreamResponse.getSinkInfo();
@@ -82,7 +84,7 @@ public class InlongStreamImpl extends InlongStream {
List<SourceResponse> sourceList = fullStreamResponse.getSourceInfo();
if (CollectionUtils.isNotEmpty(sourceList)) {
this.streamSources = sourceList.stream()
- .map(sourceResponse -> InlongStreamSourceTransfer.parseStreamSource(sourceResponse))
+ .map(InlongStreamSourceTransfer::parseStreamSource)
.collect(Collectors.toMap(StreamSource::getSourceName, streamSource -> streamSource,
(source1, source2) -> {
throw new RuntimeException(
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
index dccbdfb..38bf6e7 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
@@ -153,7 +153,7 @@ public class InlongStreamSinkTransfer {
sinkFieldResponse.getFieldComment(),
null, sinkFieldResponse.getSourceFieldName(),
sinkFieldResponse.getSourceFieldType(),
- sinkFieldResponse.getIsSourceMetaField())).collect(Collectors.toList());
+ sinkFieldResponse.getIsMetaField())).collect(Collectors.toList());
}
@@ -231,18 +231,18 @@ public class InlongStreamSinkTransfer {
}
private static List<SinkFieldRequest> createSinkFieldRequests(List<SinkField> sinkFields) {
- List<SinkFieldRequest> sinkFieldRequests = Lists.newArrayList();
+ List<SinkFieldRequest> fieldRequestList = Lists.newArrayList();
for (SinkField sinkField : sinkFields) {
- SinkFieldRequest sinkFieldRequest = new SinkFieldRequest();
- sinkFieldRequest.setFieldName(sinkField.getFieldName());
- sinkFieldRequest.setFieldType(sinkField.getFieldType().toString());
- sinkFieldRequest.setFieldComment(sinkField.getFieldComment());
- sinkFieldRequest.setSourceFieldName(sinkField.getSourceFieldName());
- sinkFieldRequest.setSourceFieldType(sinkField.getSourceFieldType());
- sinkFieldRequest.setIsSourceMetaField(sinkField.getIsSourceMetaField());
- sinkFieldRequests.add(sinkFieldRequest);
+ SinkFieldRequest request = new SinkFieldRequest();
+ request.setFieldName(sinkField.getFieldName());
+ request.setFieldType(sinkField.getFieldType().toString());
+ request.setFieldComment(sinkField.getFieldComment());
+ request.setSourceFieldName(sinkField.getSourceFieldName());
+ request.setSourceFieldType(sinkField.getSourceFieldType());
+ request.setIsMetaField(sinkField.getIsMetaField());
+ fieldRequestList.add(request);
}
- return sinkFieldRequests;
+ return fieldRequestList;
}
private static HiveSink parseHiveSink(HiveSinkResponse sinkResponse, StreamSink sink) {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
index 0ca083b..bf68845 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
@@ -57,18 +57,17 @@ public class InlongStreamTransfer {
}
public static List<InlongStreamFieldInfo> createStreamFields(
- List<StreamField> fieldList,
- InlongStreamResponse streamInfo) {
- List<InlongStreamFieldInfo> fieldInfos = fieldList.stream().map(streamField -> {
+ List<StreamField> fieldList, InlongStreamResponse streamInfo) {
+ return fieldList.stream().map(field -> {
InlongStreamFieldInfo fieldInfo = new InlongStreamFieldInfo();
fieldInfo.setInlongStreamId(streamInfo.getInlongStreamId());
fieldInfo.setInlongGroupId(streamInfo.getInlongGroupId());
- fieldInfo.setFieldName(streamField.getFieldName());
- fieldInfo.setFieldType(streamField.getFieldType().toString());
- fieldInfo.setFieldComment(streamField.getFieldComment());
- fieldInfo.setFieldValue(streamField.getFieldValue());
+ fieldInfo.setFieldName(field.getFieldName());
+ fieldInfo.setFieldType(field.getFieldType().toString());
+ fieldInfo.setFieldComment(field.getFieldComment());
+ fieldInfo.setFieldValue(field.getFieldValue());
+ fieldInfo.setIsMetaField(field.getIsMetaField());
return fieldInfo;
}).collect(Collectors.toList());
- return fieldInfos;
}
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldRequest.java
index 8efb7cf..d5d920a 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldRequest.java
@@ -46,8 +46,8 @@ public class SinkFieldRequest {
@ApiModelProperty("Source field type")
private String sourceFieldType;
- @ApiModelProperty("Is source meta field, 0: no, 1: yes")
- private Integer isSourceMetaField = 0;
+ @ApiModelProperty("Is this field a meta field, 0: no, 1: yes")
+ private Integer isMetaField = 0;
@ApiModelProperty("Field order")
private Short rankNum;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldResponse.java
index 773d476..8933a25 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldResponse.java
@@ -51,8 +51,8 @@ public class SinkFieldResponse {
@ApiModelProperty("Source field type")
private String sourceFieldType;
- @ApiModelProperty("Is source meta field, 0: no, 1: yes")
- private Integer isSourceMetaField = 0;
+ @ApiModelProperty("Is this field a meta field, 0: no, 1: yes")
+ private Integer isMetaField = 0;
@ApiModelProperty("Field order")
private Short rankNum;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamFieldInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamFieldInfo.java
index 754280a..d10873f 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamFieldInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamFieldInfo.java
@@ -22,7 +22,7 @@ import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
/**
- * Inlong stream field info
+ * Inlong stream field info.
*/
@Data
@ApiModel("Inlong stream field info")
@@ -48,14 +48,16 @@ public class InlongStreamFieldInfo {
@ApiModelProperty(value = "value expression of predefined field")
private String preExpression;
+ @ApiModelProperty("Field type")
private String fieldType;
+ @ApiModelProperty("Field comment")
private String fieldComment;
+ @ApiModelProperty("Is this field a meta field, 0: no, 1: yes")
+ private Integer isMetaField = 0;
+
@ApiModelProperty(value = "field rank num")
private Short rankNum;
- @ApiModelProperty(value = "is deleted? 0: deleted, > 0: not deleted")
- private Integer isDeleted = 0;
-
}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongStreamFieldEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongStreamFieldEntity.java
index a13934c..0109030 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongStreamFieldEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongStreamFieldEntity.java
@@ -17,9 +17,10 @@
package org.apache.inlong.manager.dao.entity;
-import java.io.Serializable;
import lombok.Data;
+import java.io.Serializable;
+
@Data
public class InlongStreamFieldEntity implements Serializable {
@@ -33,6 +34,7 @@ public class InlongStreamFieldEntity implements Serializable {
private String preExpression;
private String fieldType;
private String fieldComment;
+ private Integer isMetaField;
private Short rankNum;
private Integer isDeleted;
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java
index 98f888f..a32a123 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java
@@ -37,7 +37,7 @@ public class StreamSinkFieldEntity implements Serializable {
private Integer isRequired;
private String sourceFieldName;
private String sourceFieldType;
- private Integer isSourceMetaField;
+ private Integer isMetaField;
private Short rankNum;
private Integer isDeleted;
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamFieldEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamFieldEntityMapper.java
index 4486e1c..fb20c11 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamFieldEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamFieldEntityMapper.java
@@ -26,34 +26,30 @@ import java.util.List;
@Repository
public interface InlongStreamFieldEntityMapper {
- int deleteByPrimaryKey(Integer id);
-
int insert(InlongStreamFieldEntity record);
- int insertSelective(InlongStreamFieldEntity record);
+ int insertAll(@Param("fieldList") List<InlongStreamFieldEntity> fieldEntityList);
InlongStreamFieldEntity selectByPrimaryKey(Integer id);
- int updateByPrimaryKeySelective(InlongStreamFieldEntity record);
-
- int updateByPrimaryKey(InlongStreamFieldEntity record);
-
List<InlongStreamFieldEntity> selectByIdentifier(@Param("groupId") String groupId,
@Param("streamId") String streamId);
- int insertAll(@Param("fieldList") List<InlongStreamFieldEntity> fieldEntityList);
-
- List<InlongStreamFieldEntity> selectStreamFields(@Param("groupId") String groupId,
+ List<InlongStreamFieldEntity> selectFields(@Param("groupId") String groupId,
@Param("streamId") String streamId);
- /**
- * According to the inlong group id and inlong stream id, physically delete all fields
- */
- int deleteAllByIdentifier(@Param("groupId") String groupId, @Param("streamId") String streamId);
+ int updateByPrimaryKey(InlongStreamFieldEntity record);
+
+ int deleteByPrimaryKey(Integer id);
/**
* According to the inlong group id and inlong stream id, logically delete all fields
*/
int logicDeleteAllByIdentifier(@Param("groupId") String groupId, @Param("streamId") String streamId);
+ /**
+ * According to the inlong group id and inlong stream id, physically delete all fields
+ */
+ int deleteAllByIdentifier(@Param("groupId") String groupId, @Param("streamId") String streamId);
+
}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamFieldEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamFieldEntityMapper.xml
index 4bee203..08f1afb 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamFieldEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamFieldEntityMapper.xml
@@ -30,148 +30,38 @@
<result column="pre_expression" jdbcType="VARCHAR" property="preExpression"/>
<result column="field_type" jdbcType="VARCHAR" property="fieldType"/>
<result column="field_comment" jdbcType="VARCHAR" property="fieldComment"/>
+ <result column="is_meta_field" jdbcType="SMALLINT" property="isMetaField"/>
<result column="rank_num" jdbcType="INTEGER" property="rankNum"/>
<result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
</resultMap>
<sql id="Base_Column_List">
id, inlong_group_id, inlong_stream_id, is_predefined_field, field_name, field_value,
- pre_expression, field_type, field_comment, rank_num, is_deleted
+ pre_expression, field_type, field_comment, is_meta_field, rank_num, is_deleted
</sql>
- <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="BaseResultMap">
- select
- <include refid="Base_Column_List"/>
- from inlong_stream_field
- where id = #{id,jdbcType=INTEGER}
- </select>
- <select id="selectByIdentifier" resultType="org.apache.inlong.manager.dao.entity.InlongStreamFieldEntity">
- select
- <include refid="Base_Column_List"/>
- from inlong_stream_field
- where inlong_group_id = #{groupId, jdbcType=VARCHAR}
- and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
- and is_deleted = 0
- order by rank_num asc
- </select>
- <select id="selectStreamFields" resultMap="BaseResultMap">
- select f.*
- from inlong_stream_field f,
- inlong_stream s
- where f.is_deleted = 0
- and s.is_deleted = 0
- and s.id = f.inlong_stream_id
- and s.inlong_group_id = #{groupId, jdbcType=VARCHAR}
- and s.inlong_stream_id = #{streamId, jdbcType=VARCHAR}
- and f.is_predefined_field = 0
- </select>
-
- <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
- delete
- from inlong_stream_field
- where id = #{id,jdbcType=INTEGER}
- </delete>
- <delete id="deleteAllByIdentifier">
- delete
- from inlong_stream_field
- where inlong_group_id = #{groupId, jdbcType=VARCHAR}
- and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
- and is_deleted = 0
- </delete>
<insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.InlongStreamFieldEntity">
insert into inlong_stream_field (id, inlong_group_id, inlong_stream_id,
is_predefined_field, field_name, field_value,
pre_expression, field_type, field_comment,
- rank_num, is_deleted)
+ is_meta_field, rank_num, is_deleted)
values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
#{isPredefinedField,jdbcType=INTEGER}, #{fieldName,jdbcType=VARCHAR}, #{fieldValue,jdbcType=VARCHAR},
#{preExpression,jdbcType=VARCHAR}, #{fieldType,jdbcType=VARCHAR}, #{fieldComment,jdbcType=VARCHAR},
- #{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER})
- </insert>
- <insert id="insertSelective" parameterType="org.apache.inlong.manager.dao.entity.InlongStreamFieldEntity">
- insert into inlong_stream_field
- <trim prefix="(" suffix=")" suffixOverrides=",">
- <if test="id != null">
- id,
- </if>
- <if test="inlongGroupId != null">
- inlong_group_id,
- </if>
- <if test="inlongStreamId != null">
- inlong_stream_id,
- </if>
- <if test="isPredefinedField != null">
- is_predefined_field,
- </if>
- <if test="fieldName != null">
- field_name,
- </if>
- <if test="fieldValue != null">
- field_value,
- </if>
- <if test="preExpression != null">
- pre_expression,
- </if>
- <if test="fieldType != null">
- field_type,
- </if>
- <if test="fieldComment != null">
- field_comment,
- </if>
- <if test="rankNum != null">
- rank_num,
- </if>
- <if test="isDeleted != null">
- is_deleted,
- </if>
- </trim>
- <trim prefix="values (" suffix=")" suffixOverrides=",">
- <if test="id != null">
- #{id,jdbcType=INTEGER},
- </if>
- <if test="inlongGroupId != null">
- #{inlongGroupId,jdbcType=VARCHAR},
- </if>
- <if test="inlongStreamId != null">
- #{inlongStreamId,jdbcType=VARCHAR},
- </if>
- <if test="isPredefinedField != null">
- #{isPredefinedField,jdbcType=INTEGER},
- </if>
- <if test="fieldName != null">
- #{fieldName,jdbcType=VARCHAR},
- </if>
- <if test="fieldValue != null">
- #{fieldValue,jdbcType=VARCHAR},
- </if>
- <if test="preExpression != null">
- #{preExpression,jdbcType=VARCHAR},
- </if>
- <if test="fieldType != null">
- #{fieldType,jdbcType=VARCHAR},
- </if>
- <if test="fieldComment != null">
- #{fieldComment,jdbcType=VARCHAR},
- </if>
- <if test="rankNum != null">
- #{rankNum,jdbcType=SMALLINT},
- </if>
- <if test="isDeleted != null">
- #{isDeleted,jdbcType=INTEGER},
- </if>
- </trim>
+ #{isMetaField,jdbcType=SMALLINT}, #{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER})
</insert>
-
<!-- Bulk insert, update if it exists -->
<insert id="insertAll" parameterType="java.util.List">
- insert into inlong_stream_field
- (id, inlong_group_id, inlong_stream_id, is_predefined_field, field_name, field_value,
- pre_expression, field_type, field_comment, rank_num, is_deleted
+ insert into inlong_stream_field (
+ id, inlong_group_id, inlong_stream_id, is_predefined_field,
+ field_name, field_value, pre_expression, field_type,
+ field_comment, is_meta_field, rank_num, is_deleted
)
values
<foreach collection="fieldList" index="index" item="item" separator=",">
- (#{item.id}, #{item.inlongGroupId}, #{item.inlongStreamId}, #{item.isPredefinedField},
- #{item.fieldName}, #{item.fieldValue},
- #{item.preExpression}, #{item.fieldType}, #{item.fieldComment}, #{item.rankNum}, #{item.isDeleted}
+ (
+ #{item.id}, #{item.inlongGroupId}, #{item.inlongStreamId}, #{item.isPredefinedField},
+ #{item.fieldName}, #{item.fieldValue}, #{item.preExpression}, #{item.fieldType},
+ #{item.fieldComment}, #{isMetaField}, #{item.rankNum}, #{item.isDeleted}
)
</foreach>
ON DUPLICATE KEY UPDATE
@@ -188,43 +78,33 @@
is_deleted = values(is_deleted)
</insert>
- <update id="updateByPrimaryKeySelective"
- parameterType="org.apache.inlong.manager.dao.entity.InlongStreamFieldEntity">
- update inlong_stream_field
- <set>
- <if test="inlongGroupId != null">
- inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
- </if>
- <if test="inlongStreamId != null">
- inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
- </if>
- <if test="isPredefinedField != null">
- is_predefined_field = #{isPredefinedField,jdbcType=INTEGER},
- </if>
- <if test="fieldName != null">
- field_name = #{fieldName,jdbcType=VARCHAR},
- </if>
- <if test="fieldValue != null">
- field_value = #{fieldValue,jdbcType=VARCHAR},
- </if>
- <if test="preExpression != null">
- pre_expression = #{preExpression,jdbcType=VARCHAR},
- </if>
- <if test="fieldType != null">
- field_type = #{fieldType,jdbcType=VARCHAR},
- </if>
- <if test="fieldComment != null">
- field_comment = #{fieldComment,jdbcType=VARCHAR},
- </if>
- <if test="rankNum != null">
- rank_num = #{rankNum,jdbcType=SMALLINT},
- </if>
- <if test="isDeleted != null">
- is_deleted = #{isDeleted,jdbcType=INTEGER},
- </if>
- </set>
+ <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="BaseResultMap">
+ select
+ <include refid="Base_Column_List"/>
+ from inlong_stream_field
where id = #{id,jdbcType=INTEGER}
- </update>
+ </select>
+ <select id="selectByIdentifier" resultType="org.apache.inlong.manager.dao.entity.InlongStreamFieldEntity">
+ select
+ <include refid="Base_Column_List"/>
+ from inlong_stream_field
+ where inlong_group_id = #{groupId, jdbcType=VARCHAR}
+ and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
+ and is_deleted = 0
+ order by id asc
+ </select>
+ <select id="selectFields" resultMap="BaseResultMap">
+ select field.*
+ from inlong_stream_field field,
+ inlong_stream stream
+ where field.is_deleted = 0
+ and stream.is_deleted = 0
+ and stream.id = field.inlong_stream_id
+ and stream.inlong_group_id = #{groupId, jdbcType=VARCHAR}
+ and stream.inlong_stream_id = #{streamId, jdbcType=VARCHAR}
+ and field.is_predefined_field = 0
+ </select>
+
<update id="updateByPrimaryKey" parameterType="org.apache.inlong.manager.dao.entity.InlongStreamFieldEntity">
update inlong_stream_field
set inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
@@ -247,4 +127,17 @@
and is_deleted = 0
</update>
+ <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
+ delete
+ from inlong_stream_field
+ where id = #{id,jdbcType=INTEGER}
+ </delete>
+ <delete id="deleteAllByIdentifier">
+ delete
+ from inlong_stream_field
+ where inlong_group_id = #{groupId, jdbcType=VARCHAR}
+ and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
+ and is_deleted = 0
+ </delete>
+
</mapper>
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
index c04e68f..c9e495f 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
@@ -31,13 +31,13 @@
<result column="field_comment" jdbcType="VARCHAR" property="fieldComment"/>
<result column="source_field_name" jdbcType="VARCHAR" property="sourceFieldName"/>
<result column="source_field_type" jdbcType="VARCHAR" property="sourceFieldType"/>
- <result column="is_source_meta_field" jdbcType="SMALLINT" property="isSourceMetaField"/>
+ <result column="is_meta_field" jdbcType="SMALLINT" property="isMetaField"/>
<result column="rank_num" jdbcType="SMALLINT" property="rankNum"/>
<result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
</resultMap>
<sql id="Base_Column_List">
id, sink_id, field_name, field_type, field_comment,
- source_field_name, source_field_type, is_source_meta_field, rank_num, is_deleted
+ source_field_name, source_field_type, is_meta_field, rank_num, is_deleted
</sql>
<insert id="insert" useGeneratedKeys="true" keyProperty="id"
@@ -45,12 +45,12 @@
insert into stream_sink_field (id, inlong_group_id, inlong_stream_id,
sink_id, sink_type, field_name,
field_type, field_comment, source_field_name,
- source_field_type, is_source_meta_field,
+ source_field_type, is_meta_field,
rank_num, is_deleted)
values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
#{sinkId,jdbcType=INTEGER}, #{sinkType,jdbcType=VARCHAR}, #{fieldName,jdbcType=VARCHAR},
#{fieldType,jdbcType=VARCHAR}, #{fieldComment,jdbcType=VARCHAR}, #{sourceFieldName,jdbcType=VARCHAR},
- #{sourceFieldType,jdbcType=VARCHAR}, #{isSourceMetaField,jdbcType=SMALLINT},
+ #{sourceFieldType,jdbcType=VARCHAR}, #{isMetaField,jdbcType=SMALLINT},
#{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER})
</insert>
<insert id="insertAll">
@@ -60,8 +60,7 @@
sink_type, field_name,
field_type, field_comment,
source_field_name, source_field_type,
- is_source_meta_field,
- rank_num, is_deleted
+ is_meta_field, rank_num, is_deleted
)
values
<foreach collection="list" index="index" item="item" separator=",">
@@ -71,8 +70,8 @@
#{item.sinkType,jdbcType=VARCHAR}, #{item.fieldName,jdbcType=VARCHAR},
#{item.fieldType,jdbcType=VARCHAR}, #{item.fieldComment,jdbcType=VARCHAR},
#{item.sourceFieldName,jdbcType=VARCHAR}, #{item.sourceFieldType,jdbcType=VARCHAR},
- #{item.isSourceMetaField,jdbcType=SMALLINT},
- #{item.rankNum,jdbcType=SMALLINT}, #{item.isDeleted,jdbcType=INTEGER}
+ #{item.isMetaField,jdbcType=SMALLINT}, #{item.rankNum,jdbcType=SMALLINT},
+ #{item.isDeleted,jdbcType=INTEGER}
)
</foreach>
</insert>
@@ -89,7 +88,7 @@
from stream_sink_field
where sink_id = #{sinkId, jdbcType=INTEGER}
and is_deleted = 0
- order by rank_num asc
+ order by id asc
</select>
<select id="selectFields" resultMap="BaseResultMap">
select field.*
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
index bc23767..362aaff 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
@@ -50,6 +50,7 @@ import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.sink.SinkInfo;
import org.apache.inlong.sort.protocol.source.SourceInfo;
import org.apache.inlong.sort.protocol.transformation.FieldMappingRule;
+import org.apache.inlong.sort.protocol.transformation.FieldMappingRule.FieldMappingUnit;
import org.apache.inlong.sort.protocol.transformation.TransformationInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -212,22 +213,30 @@ public class CommonOperateService {
// Get all field info
List<FieldInfo> sourceFields = new ArrayList<>();
List<FieldInfo> sinkFields = new ArrayList<>();
- String partition = null;
+ // TODO Need support hive partition field
if (SinkType.forType(sinkResponse.getSinkType()) == SinkType.HIVE) {
HiveSinkResponse hiveSink = (HiveSinkResponse) sinkResponse;
- partition = hiveSink.getPrimaryPartition();
+ String partition = hiveSink.getPrimaryPartition();
}
// TODO Support more than one source and one sink
final SourceResponse sourceResponse = sourceList.get(0);
boolean isAllMigration = SourceInfoUtils.isBinlogAllMigration(sourceResponse);
- FieldMappingRule fieldMappingRule = FieldInfoUtils.createFieldInfo(isAllMigration,
- sinkResponse.getFieldList(), sourceFields, sinkFields, partition);
+
+ List<FieldMappingUnit> mappingUnitList;
+ InlongStreamResponse streamInfo = streamService.get(groupId, streamId);
+ if (isAllMigration) {
+ mappingUnitList = FieldInfoUtils.setAllMigrationFieldMapping(sourceFields, sinkFields);
+ } else {
+ mappingUnitList = FieldInfoUtils.createFieldInfo(streamInfo.getFieldList(),
+ sinkResponse.getFieldList(), sourceFields, sinkFields);
+ }
+
+ FieldMappingRule fieldMappingRule = new FieldMappingRule(mappingUnitList.toArray(new FieldMappingUnit[0]));
// Get source info
String masterAddress = getSpecifiedParam(Constant.TUBE_MASTER_URL);
PulsarClusterInfo pulsarCluster = getPulsarClusterInfo(groupInfo.getMiddlewareType());
- InlongStreamResponse streamInfo = streamService.get(groupId, streamId);
SourceInfo sourceInfo = SourceInfoUtils.createSourceInfo(pulsarCluster, masterAddress, clusterBean,
groupInfo, streamInfo, sourceResponse, sourceFields);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index cb45a55..23215c1 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -309,7 +309,7 @@ public class AgentServiceImpl implements AgentService {
}
List<InlongStreamFieldEntity> preFields = streamFieldMapper
- .selectStreamFields(config.getInlongGroupId(), config.getInlongStreamId());
+ .selectFields(config.getInlongGroupId(), config.getInlongStreamId());
if (!config.getSortType().equalsIgnoreCase("13")) {
int fIndex = 0;
@@ -457,7 +457,7 @@ public class AgentServiceImpl implements AgentService {
s.append("p=t").append("&");
}
- List<InlongStreamFieldEntity> preFields = streamFieldMapper.selectStreamFields(
+ List<InlongStreamFieldEntity> preFields = streamFieldMapper.selectFields(
config.getInlongGroupId(),
config.getInlongStreamId());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
index 68f24c4..268dc78 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
@@ -619,12 +619,12 @@ public class InlongStreamServiceImpl implements InlongStreamService {
* <p/>First physically delete the existing field information, and then add the field information of this batch
*/
@Transactional(rollbackFor = Throwable.class)
- void updateField(String groupId, String streamId, List<InlongStreamFieldInfo> fieldInfoList) {
+ void updateField(String groupId, String streamId, List<InlongStreamFieldInfo> fieldList) {
LOGGER.debug("begin to update inlong stream field, groupId={}, streamId={}, field={}", groupId, streamId,
- fieldInfoList);
+ fieldList);
try {
streamFieldMapper.deleteAllByIdentifier(groupId, streamId);
- saveField(groupId, streamId, fieldInfoList);
+ saveField(groupId, streamId, fieldList);
LOGGER.info("success to update inlong stream field for groupId={}", groupId);
} catch (Exception e) {
LOGGER.error("failed to update inlong stream field: ", e);
@@ -637,13 +637,14 @@ public class InlongStreamServiceImpl implements InlongStreamService {
if (CollectionUtils.isEmpty(infoList)) {
return;
}
- List<InlongStreamFieldEntity> entities = CommonBeanUtils.copyListProperties(infoList,
+ List<InlongStreamFieldEntity> list = CommonBeanUtils.copyListProperties(infoList,
InlongStreamFieldEntity::new);
- for (InlongStreamFieldEntity entity : entities) {
+ for (InlongStreamFieldEntity entity : list) {
entity.setInlongGroupId(groupId);
entity.setInlongStreamId(streamId);
+ entity.setIsDeleted(Constant.UN_DELETED);
}
- streamFieldMapper.insertAll(entities);
+ streamFieldMapper.insertAll(list);
}
/**
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/FieldInfoUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/FieldInfoUtils.java
index bb386f8..a14276e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/FieldInfoUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/FieldInfoUtils.java
@@ -17,10 +17,10 @@
package org.apache.inlong.manager.service.thirdparty.sort.util;
-import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.FieldType;
import org.apache.inlong.manager.common.enums.MetaFieldType;
import org.apache.inlong.manager.common.pojo.sink.SinkFieldResponse;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
import org.apache.inlong.sort.formats.common.ArrayFormatInfo;
import org.apache.inlong.sort.formats.common.BooleanFormatInfo;
import org.apache.inlong.sort.formats.common.ByteFormatInfo;
@@ -39,7 +39,6 @@ import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
import org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField;
import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.transformation.FieldMappingRule;
import org.apache.inlong.sort.protocol.transformation.FieldMappingRule.FieldMappingUnit;
import java.util.ArrayList;
@@ -68,51 +67,32 @@ public class FieldInfoUtils {
/**
* Get field info list.
- * TODO 1. Support partition field, 2. Add is_metadata field in StreamSinkFieldEntity
+ * TODO 1. Support partition field(not need to add index at 0), 2. Add is_metadata field in StreamSinkFieldEntity
*/
- public static FieldMappingRule createFieldInfo(boolean isAllMigration, List<SinkFieldResponse> fieldList,
- List<FieldInfo> sourceFields, List<FieldInfo> sinkFields, String partitionField) {
+ public static List<FieldMappingUnit> createFieldInfo(
+ List<InlongStreamFieldInfo> streamFieldList, List<SinkFieldResponse> fieldList,
+ List<FieldInfo> sourceFields, List<FieldInfo> sinkFields) {
+
+ // Set source field info list.
+ for (InlongStreamFieldInfo field : streamFieldList) {
+ FieldInfo sourceField = getFieldInfo(field.getFieldName(), field.getFieldType(),
+ field.getIsMetaField() == 1);
+ sourceFields.add(sourceField);
+ }
- List<FieldMappingUnit> fieldMappingUnitList = new ArrayList<>();
- if (isAllMigration) {
- setAllMigrationBuiltInField(sourceFields, sinkFields, fieldMappingUnitList);
- } else {
- boolean duplicate = false;
- for (SinkFieldResponse field : fieldList) {
- // If the field name equals to build-in field, new a build-in field info
- FieldInfo sourceFieldInfo = getFieldInfo(field.getSourceFieldName(),
- field.getSourceFieldType(), field.getIsSourceMetaField() == 1);
- sourceFields.add(sourceFieldInfo);
-
- // Get sink field info
- String sinkFieldName = field.getFieldName();
- if (sinkFieldName.equals(partitionField)) {
- duplicate = true;
- }
- FieldInfo sinkFieldInfo = getSinkFieldInfo(field.getFieldName(), field.getFieldType(),
- field.getSourceFieldName(), field.getIsSourceMetaField() == 1);
- sinkFields.add(sinkFieldInfo);
-
- fieldMappingUnitList.add(new FieldMappingUnit(sourceFieldInfo, sinkFieldInfo));
- }
+ List<FieldMappingUnit> mappingUnitList = new ArrayList<>();
+ // Get sink field info list, if the field name equals to build-in field, new a build-in field info
+ for (SinkFieldResponse field : fieldList) {
+ FieldInfo sinkField = getFieldInfo(field.getFieldName(), field.getFieldType(),
+ field.getIsMetaField() == 1);
+ sinkFields.add(sinkField);
- // If no partition field in the ordinary fields, add the partition field to the first position
- if (!duplicate && StringUtils.isNotEmpty(partitionField)) {
- FieldInfo fieldInfo = new FieldInfo(partitionField, new TimestampFormatInfo("MILLIS"));
- sourceFields.add(0, fieldInfo);
- }
+ FieldInfo sourceField = getFieldInfo(field.getSourceFieldName(),
+ field.getSourceFieldType(), field.getIsMetaField() == 1);
+ mappingUnitList.add(new FieldMappingUnit(sourceField, sinkField));
}
- return new FieldMappingRule(fieldMappingUnitList.toArray(new FieldMappingUnit[0]));
- }
-
- /**
- * Get field info by the given field name ant type.
- *
- * @apiNote If the field name equals to build-in field, new a build-in field info
- */
- private static FieldInfo getFieldInfo(String fieldName, String fieldType) {
- return getFieldInfo(fieldName, fieldType, false);
+ return mappingUnitList;
}
/**
@@ -133,33 +113,16 @@ public class FieldInfoUtils {
}
/**
- * Get field info by the given field name ant type.
- *
- * @apiNote If the field name equals to build-in field, new a build-in field info
+ * Get all migration field mapping unit list for binlog source.
*/
- private static FieldInfo getSinkFieldInfo(String fieldName, String fieldType,
- String sourceFieldName, boolean isBuiltin) {
- FieldInfo fieldInfo;
- BuiltInField builtInField = BUILT_IN_FIELD_MAP.get(sourceFieldName);
- FormatInfo formatInfo = convertFieldFormat(fieldType.toLowerCase());
- if (isBuiltin && builtInField != null) {
- fieldInfo = new BuiltInFieldInfo(fieldName, formatInfo, builtInField);
- } else {
- fieldInfo = new FieldInfo(fieldName, formatInfo);
- }
- return fieldInfo;
- }
-
- /**
- * Get all migration built-in field for binlog source.
- */
- public static void setAllMigrationBuiltInField(List<FieldInfo> sourceFields, List<FieldInfo> sinkFields,
- List<FieldMappingUnit> fieldMappingUnitList) {
- BuiltInFieldInfo dataField = new BuiltInFieldInfo("data",
- StringFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_DATA);
+ public static List<FieldMappingUnit> setAllMigrationFieldMapping(List<FieldInfo> sourceFields,
+ List<FieldInfo> sinkFields) {
+ List<FieldMappingUnit> mappingUnitList = new ArrayList<>();
+ BuiltInFieldInfo dataField = new BuiltInFieldInfo("data", StringFormatInfo.INSTANCE,
+ BuiltInField.MYSQL_METADATA_DATA);
sourceFields.add(dataField);
sinkFields.add(dataField);
- fieldMappingUnitList.add(new FieldMappingUnit(dataField, dataField));
+ mappingUnitList.add(new FieldMappingUnit(dataField, dataField));
for (Map.Entry<String, BuiltInField> entry : BUILT_IN_FIELD_MAP.entrySet()) {
if (entry.getKey().equals("data_time")) {
@@ -169,8 +132,10 @@ public class FieldInfoUtils {
StringFormatInfo.INSTANCE, entry.getValue());
sourceFields.add(fieldInfo);
sinkFields.add(fieldInfo);
- fieldMappingUnitList.add(new FieldMappingUnit(fieldInfo, fieldInfo));
+ mappingUnitList.add(new FieldMappingUnit(fieldInfo, fieldInfo));
}
+
+ return mappingUnitList;
}
/**
diff --git a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
index 568a1d0..9416ad0 100644
--- a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
@@ -404,6 +404,7 @@ CREATE TABLE `inlong_stream_field`
`pre_expression` varchar(256) DEFAULT NULL COMMENT 'Pre-defined field value expression',
`field_type` varchar(20) NOT NULL COMMENT 'field type',
`field_comment` varchar(50) DEFAULT NULL COMMENT 'Field description',
+ `is_meta_field` smallint(3) DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
`rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order (front-end display field order)',
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
PRIMARY KEY (`id`),
@@ -637,19 +638,19 @@ CREATE TABLE `stream_sink_ext`
DROP TABLE IF EXISTS `stream_sink_field`;
CREATE TABLE `stream_sink_field`
(
- `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
- `inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id',
- `inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id',
- `sink_id` int(11) NOT NULL COMMENT 'Sink id',
- `sink_type` varchar(15) NOT NULL COMMENT 'Sink type',
- `source_field_name` varchar(50) DEFAULT NULL COMMENT 'Source field name',
- `source_field_type` varchar(50) DEFAULT NULL COMMENT 'Source field type',
- `is_source_meta_field` smallint(3) DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
- `field_name` varchar(50) NOT NULL COMMENT 'Field name',
- `field_type` varchar(50) NOT NULL COMMENT 'Field type',
- `field_comment` varchar(2000) DEFAULT NULL COMMENT 'Field description',
- `rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order (front-end display field order)',
- `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+ `inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id',
+ `inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id',
+ `sink_id` int(11) NOT NULL COMMENT 'Sink id',
+ `sink_type` varchar(15) NOT NULL COMMENT 'Sink type',
+ `source_field_name` varchar(50) DEFAULT NULL COMMENT 'Source field name',
+ `source_field_type` varchar(50) DEFAULT NULL COMMENT 'Source field type',
+ `field_name` varchar(50) NOT NULL COMMENT 'Field name',
+ `field_type` varchar(50) NOT NULL COMMENT 'Field type',
+ `field_comment` varchar(2000) DEFAULT NULL COMMENT 'Field description',
+ `is_meta_field` smallint(3) DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
+ `rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order (front-end display field order)',
+ `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
PRIMARY KEY (`id`)
);
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 2c54dc0..7b6b41d 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -426,6 +426,7 @@ CREATE TABLE `inlong_stream_field`
`pre_expression` varchar(256) DEFAULT NULL COMMENT 'Pre-defined field value expression',
`field_type` varchar(20) NOT NULL COMMENT 'field type',
`field_comment` varchar(50) DEFAULT NULL COMMENT 'Field description',
+ `is_meta_field` smallint(3) DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
`rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order (front-end display field order)',
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
PRIMARY KEY (`id`),
@@ -669,19 +670,19 @@ CREATE TABLE `stream_sink_ext`
DROP TABLE IF EXISTS `stream_sink_field`;
CREATE TABLE `stream_sink_field`
(
- `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
- `inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id',
- `inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id',
- `sink_id` int(11) NOT NULL COMMENT 'Sink id',
- `sink_type` varchar(15) NOT NULL COMMENT 'Sink type',
- `source_field_name` varchar(50) DEFAULT NULL COMMENT 'Source field name',
- `source_field_type` varchar(50) DEFAULT NULL COMMENT 'Source field type',
- `is_source_meta_field` smallint(3) DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
- `field_name` varchar(50) NOT NULL COMMENT 'Field name',
- `field_type` varchar(50) NOT NULL COMMENT 'Field type',
- `field_comment` varchar(2000) DEFAULT NULL COMMENT 'Field description',
- `rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order (front-end display field order)',
- `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+ `inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id',
+ `inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id',
+ `sink_id` int(11) NOT NULL COMMENT 'Sink id',
+ `sink_type` varchar(15) NOT NULL COMMENT 'Sink type',
+ `source_field_name` varchar(50) DEFAULT NULL COMMENT 'Source field name',
+ `source_field_type` varchar(50) DEFAULT NULL COMMENT 'Source field type',
+ `field_name` varchar(50) NOT NULL COMMENT 'Field name',
+ `field_type` varchar(50) NOT NULL COMMENT 'Field type',
+ `field_comment` varchar(2000) DEFAULT NULL COMMENT 'Field description',
+ `is_meta_field` smallint(3) DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
+ `rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order (front-end display field order)',
+ `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
PRIMARY KEY (`id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Stream sink field table';