You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/14 11:00:43 UTC
[incubator-inlong] branch master updated: [INLONG-3112][Manager] Support source metadata field for Sort (#3118)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b7f6fc4 [INLONG-3112][Manager] Support source metadata field for Sort (#3118)
b7f6fc4 is described below
commit b7f6fc47054ce236ab85479b005fa2d19e38ae76
Author: yunqingmoswu <44...@users.noreply.github.com>
AuthorDate: Mon Mar 14 19:00:38 2022 +0800
[INLONG-3112][Manager] Support source metadata field for Sort (#3118)
---
.../inlong/manager/client/api/SinkField.java | 6 ++-
.../manager/client/api/source/KafkaSource.java | 13 ++++-
.../client/api/util/InlongStreamSinkTransfer.java | 4 +-
.../api/util/InlongStreamSourceTransfer.java | 10 +++-
.../inlong/manager/common/enums/MetaFieldType.java | 63 ++++++++++++++++++++++
.../manager/common/pojo/sink/SinkFieldRequest.java | 3 ++
.../common/pojo/sink/SinkFieldResponse.java | 3 ++
.../common/pojo/source/kafka/KafkaSourceDTO.java | 16 ++++++
.../pojo/source/kafka/KafkaSourceRequest.java | 12 +++++
.../pojo/source/kafka/KafkaSourceResponse.java | 12 +++++
.../manager/dao/entity/StreamSinkFieldEntity.java | 1 +
.../mappers/StreamSinkFieldEntityMapper.xml | 23 +++++---
.../thirdparty/sort/util/FieldInfoUtils.java | 51 ++++++++++++++----
.../thirdparty/sort/util/SerializationUtils.java | 4 ++
.../main/resources/sql/apache_inlong_manager.sql | 1 +
.../manager-web/sql/apache_inlong_manager.sql | 1 +
16 files changed, 200 insertions(+), 23 deletions(-)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SinkField.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SinkField.java
index 7a0b3e3..3dee543 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SinkField.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SinkField.java
@@ -33,10 +33,14 @@ public class SinkField extends StreamField {
@ApiModelProperty("Source field type")
private String sourceFieldType;
+ @ApiModelProperty("Is source meta field, 0: no, 1: yes")
+ private Integer isSourceMetaField = 0;
+
public SinkField(int index, FieldType fieldType, String fieldName, String fieldComment,
- String fieldValue, String sourceFieldName, String sourceFieldType) {
+ String fieldValue, String sourceFieldName, String sourceFieldType, Integer isSourceMetaField) {
super(index, fieldType, fieldName, fieldComment, fieldValue);
this.sourceFieldName = sourceFieldName;
this.sourceFieldType = sourceFieldType;
+ this.isSourceMetaField = isSourceMetaField;
}
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/KafkaSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/KafkaSource.java
index 1e64253..61a4faf 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/KafkaSource.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/KafkaSource.java
@@ -24,8 +24,8 @@ import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import org.apache.inlong.manager.client.api.DataFormat;
-import org.apache.inlong.manager.client.api.StreamSource;
import org.apache.inlong.manager.client.api.KafkaOffset;
+import org.apache.inlong.manager.client.api.StreamSource;
import org.apache.inlong.manager.common.enums.SourceType;
@Data
@@ -68,4 +68,15 @@ public class KafkaSource extends StreamSource {
@ApiModelProperty(value = "The strategy of auto offset reset")
private KafkaOffset autoOffsetReset;
+ @ApiModelProperty("database pattern used for filter in canal format")
+ private String databasePattern;
+
+ @ApiModelProperty("table pattern used for filter in canal format")
+ private String tablePattern;
+
+ @ApiModelProperty("ignore parse errors, true: ignore parse error; false: not ignore parse error; default true")
+ private boolean ignoreParseErrors = true;
+
+ @ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
+ private String timestampFormatStandard = "SQL";
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
index 8c517c3..8214342 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
@@ -152,7 +152,8 @@ public class InlongStreamSinkTransfer {
sinkFieldResponse.getFieldName(),
sinkFieldResponse.getFieldComment(),
null, sinkFieldResponse.getSourceFieldName(),
- sinkFieldResponse.getSourceFieldType())).collect(Collectors.toList());
+ sinkFieldResponse.getSourceFieldType(),
+ sinkFieldResponse.getIsSourceMetaField())).collect(Collectors.toList());
}
@@ -239,6 +240,7 @@ public class InlongStreamSinkTransfer {
sinkFieldRequest.setFieldComment(sinkField.getFieldComment());
sinkFieldRequest.setSourceFieldName(sinkField.getSourceFieldName());
sinkFieldRequest.setSourceFieldType(sinkField.getSourceFieldType());
+ sinkFieldRequest.setIsSourceMetaField(sinkField.getIsSourceMetaField());
sinkFieldRequests.add(sinkFieldRequest);
}
return sinkFieldRequests;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
index 9b58ddc..0294a79 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
@@ -21,12 +21,12 @@ import com.google.common.base.Joiner;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.client.api.DataFormat;
+import org.apache.inlong.manager.client.api.KafkaOffset;
import org.apache.inlong.manager.client.api.StreamSource;
import org.apache.inlong.manager.client.api.StreamSource.SyncType;
import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
import org.apache.inlong.manager.client.api.source.KafkaSource;
import org.apache.inlong.manager.client.api.source.MySQLBinlogSource;
-import org.apache.inlong.manager.client.api.KafkaOffset;
import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
import org.apache.inlong.manager.common.pojo.source.SourceRequest;
@@ -95,6 +95,10 @@ public class InlongStreamSourceTransfer {
kafkaSource.setTopicPartitionOffset(kafkaSourceResponse.getTopicPartitionOffset());
kafkaSource.setRecordSpeedLimit(kafkaSourceResponse.getRecordSpeedLimit());
kafkaSource.setSyncType(SyncType.FULL);
+ kafkaSource.setDatabasePattern(kafkaSourceResponse.getDatabasePattern());
+ kafkaSource.setTablePattern(kafkaSourceResponse.getTablePattern());
+ kafkaSource.setIgnoreParseErrors(kafkaSourceResponse.isIgnoreParseErrors());
+ kafkaSource.setTimestampFormatStandard(kafkaSourceResponse.getTimestampFormatStandard());
return kafkaSource;
}
@@ -183,6 +187,10 @@ public class InlongStreamSourceTransfer {
sourceRequest.setAutoOffsetReset(kafkaSource.getAutoOffsetReset().getName());
sourceRequest.setGroupId(kafkaSource.getConsumerGroup());
sourceRequest.setSerializationType(kafkaSource.getDataFormat().getName());
+ sourceRequest.setDatabasePattern(kafkaSource.getDatabasePattern());
+ sourceRequest.setTablePattern(kafkaSource.getTablePattern());
+ sourceRequest.setIgnoreParseErrors(kafkaSource.isIgnoreParseErrors());
+ sourceRequest.setTimestampFormatStandard(kafkaSource.getTimestampFormatStandard());
return sourceRequest;
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MetaFieldType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MetaFieldType.java
new file mode 100644
index 0000000..846ceac
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MetaFieldType.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.enums;
+
+import lombok.Getter;
+
+@Getter
+public enum MetaFieldType {
+
+ /**
+ * database
+ */
+ DATABASE("database", "meta field database used in canal json or mysql binlong and so on"),
+
+ /**
+ * data_time
+ */
+ DATA_TIME("data_time", "meta field data_time used in canal json or mysql binlong and so on"),
+
+ /**
+ * table
+ */
+ TABLE("table", "meta field table used in canal json or mysql binlong and so on"),
+
+ /**
+ * event_time
+ */
+ EVENT_TIME("event_time", "meta field event_time used in canal json or mysql binlong and so on"),
+
+ /**
+ * is_ddl
+ */
+ IS_DDL("is_ddl", "meta field is_ddl used in canal json or mysql binlong and so on"),
+
+ /**
+ * event_type
+ */
+ EVENT_TYPE("event_type", "meta field event_type used in canal json or mysql binlong and so on");
+
+ private final String name;
+
+ private final String description;
+
+ MetaFieldType(String name, String description) {
+ this.name = name;
+ this.description = description;
+ }
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldRequest.java
index c3aceb1..8efb7cf 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldRequest.java
@@ -46,6 +46,9 @@ public class SinkFieldRequest {
@ApiModelProperty("Source field type")
private String sourceFieldType;
+ @ApiModelProperty("Is source meta field, 0: no, 1: yes")
+ private Integer isSourceMetaField = 0;
+
@ApiModelProperty("Field order")
private Short rankNum;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldResponse.java
index cdaffe8..773d476 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldResponse.java
@@ -51,6 +51,9 @@ public class SinkFieldResponse {
@ApiModelProperty("Source field type")
private String sourceFieldType;
+ @ApiModelProperty("Is source meta field, 0: no, 1: yes")
+ private Integer isSourceMetaField = 0;
+
@ApiModelProperty("Field order")
private Short rankNum;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
index f4fb9bb..2c5c28f 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
@@ -71,6 +71,18 @@ public class KafkaSourceDTO {
@ApiModelProperty("Data Serialization, support: json, canal, avro, etc")
private String serializationType;
+ @ApiModelProperty("database pattern used for filter in canal format")
+ private String databasePattern;
+
+ @ApiModelProperty("table pattern used for filter in canal format")
+ private String tablePattern;
+
+ @ApiModelProperty("ignore parse errors, true: ignore parse error; false: not ignore parse error; default true")
+ private boolean ignoreParseErrors;
+
+ @ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
+ private String timestampFormatStandard;
+
/**
* Get the dto instance from the request
*/
@@ -84,6 +96,10 @@ public class KafkaSourceDTO {
.topicPartitionOffset(request.getTopicPartitionOffset())
.autoOffsetReset(request.getAutoOffsetReset())
.serializationType(request.getSerializationType())
+ .databasePattern(request.getDatabasePattern())
+ .tablePattern(request.getTablePattern())
+ .ignoreParseErrors(request.isIgnoreParseErrors())
+ .timestampFormatStandard(request.getTimestampFormatStandard())
.build();
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java
index 84110d1..0b6a230 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java
@@ -62,6 +62,18 @@ public class KafkaSourceRequest extends SourceRequest {
notes = "including earliest, latest (the default), none")
private String autoOffsetReset;
+ @ApiModelProperty("database pattern used for filter in canal format")
+ private String databasePattern;
+
+ @ApiModelProperty("table pattern used for filter in canal format")
+ private String tablePattern;
+
+ @ApiModelProperty("ignore parse errors, true: ignore parse error; false: not ignore parse error; default true")
+ private boolean ignoreParseErrors = true;
+
+ @ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
+ private String timestampFormatStandard = "SQL";
+
public KafkaSourceRequest() {
this.setSourceType(SourceType.KAFKA.toString());
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceResponse.java
index 34b178b..3055db1 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceResponse.java
@@ -55,6 +55,18 @@ public class KafkaSourceResponse extends SourceResponse {
@ApiModelProperty(value = "The strategy of auto offset reset")
private String autoOffsetReset;
+ @ApiModelProperty("database pattern used for filter in canal format")
+ private String databasePattern;
+
+ @ApiModelProperty("table pattern used for filter in canal format")
+ private String tablePattern;
+
+ @ApiModelProperty("ignore parse errors, true: ignore parse error; false: not ignore parse error; default true")
+ private boolean ignoreParseErrors;
+
+ @ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
+ private String timestampFormatStandard;
+
public KafkaSourceResponse() {
this.setSourceType(SourceType.KAFKA.name());
}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java
index 91f988a..98f888f 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java
@@ -37,6 +37,7 @@ public class StreamSinkFieldEntity implements Serializable {
private Integer isRequired;
private String sourceFieldName;
private String sourceFieldType;
+ private Integer isSourceMetaField;
private Short rankNum;
private Integer isDeleted;
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
index d647045..a1fd0bc 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
@@ -18,7 +18,8 @@
under the License.
-->
-<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
+ "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper">
<resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity">
<id column="id" jdbcType="INTEGER" property="id"/>
@@ -31,25 +32,30 @@
<result column="field_comment" jdbcType="VARCHAR" property="fieldComment"/>
<result column="source_field_name" jdbcType="VARCHAR" property="sourceFieldName"/>
<result column="source_field_type" jdbcType="VARCHAR" property="sourceFieldType"/>
+ <result column="is_source_meta_field" jdbcType="INTEGER" property="isSourceMetaField"/>
<result column="rank_num" jdbcType="SMALLINT" property="rankNum"/>
<result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
</resultMap>
<sql id="Base_Column_List">
- id, sink_id, field_name, field_type, field_comment,
- source_field_name, source_field_type, rank_num, is_deleted
+ id
+ , sink_id, field_name, field_type, field_comment,
+ source_field_name, source_field_type, is_source_meta_field, rank_num, is_deleted
</sql>
- <insert id="insert" useGeneratedKeys="true" keyProperty="id"
+ <insert id="insert" useGeneratedKeys="true" keyProperty="id"
parameterType="org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity">
insert into stream_sink_field (id, inlong_group_id, inlong_stream_id,
sink_id, sink_type,
field_name, field_type, field_comment,
- source_field_name, source_field_type,
+ source_field_name, source_field_type, is_source_meta_field,
rank_num, is_deleted)
- values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
+ values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR},
+ #{inlongStreamId,jdbcType=VARCHAR},
#{sinkId,jdbcType=INTEGER}, #{sinkType,jdbcType=VARCHAR},
- #{fieldName,jdbcType=VARCHAR}, #{fieldType,jdbcType=VARCHAR}, #{fieldComment,jdbcType=VARCHAR},
+ #{fieldName,jdbcType=VARCHAR}, #{fieldType,jdbcType=VARCHAR},
+ #{fieldComment,jdbcType=VARCHAR},
#{sourceFieldName,jdbcType=VARCHAR}, #{sourceFieldType,jdbcType=VARCHAR},
+ #{isSourceMetaField,jdbcType=Integer},
#{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER})
</insert>
<insert id="insertAll">
@@ -58,7 +64,7 @@
inlong_stream_id, sink_id,
sink_type, field_name,
field_type, field_comment,
- source_field_name, source_field_type,
+ source_field_name, source_field_type,is_source_meta_field,
rank_num, is_deleted
)
values
@@ -69,6 +75,7 @@
#{item.sinkType,jdbcType=VARCHAR}, #{item.fieldName,jdbcType=VARCHAR},
#{item.fieldType,jdbcType=VARCHAR}, #{item.fieldComment,jdbcType=VARCHAR},
#{item.sourceFieldName,jdbcType=VARCHAR}, #{item.sourceFieldType,jdbcType=VARCHAR},
+ #{isSourceMetaField,jdbcType=Integer},
#{item.rankNum,jdbcType=SMALLINT}, #{item.isDeleted,jdbcType=INTEGER}
)
</foreach>
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/FieldInfoUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/FieldInfoUtils.java
index 0bb4a7d..b7de742 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/FieldInfoUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/FieldInfoUtils.java
@@ -18,6 +18,7 @@
package org.apache.inlong.manager.service.thirdparty.sort.util;
import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.enums.MetaFieldType;
import org.apache.inlong.manager.common.pojo.sink.SinkFieldResponse;
import org.apache.inlong.sort.formats.common.ArrayFormatInfo;
import org.apache.inlong.sort.formats.common.BooleanFormatInfo;
@@ -56,12 +57,12 @@ public class FieldInfoUtils {
public static final Map<String, BuiltInField> BUILT_IN_FIELD_MAP = new HashMap<>();
static {
- BUILT_IN_FIELD_MAP.put("data_time", BuiltInField.DATA_TIME);
- BUILT_IN_FIELD_MAP.put("database", BuiltInField.MYSQL_METADATA_DATABASE);
- BUILT_IN_FIELD_MAP.put("table", BuiltInField.MYSQL_METADATA_TABLE);
- BUILT_IN_FIELD_MAP.put("event_time", BuiltInField.MYSQL_METADATA_EVENT_TIME);
- BUILT_IN_FIELD_MAP.put("is_ddl", BuiltInField.MYSQL_METADATA_IS_DDL);
- BUILT_IN_FIELD_MAP.put("event_type", BuiltInField.MYSQL_METADATA_EVENT_TYPE);
+ BUILT_IN_FIELD_MAP.put(MetaFieldType.DATA_TIME.getName(), BuiltInField.DATA_TIME);
+ BUILT_IN_FIELD_MAP.put(MetaFieldType.DATABASE.getName(), BuiltInField.MYSQL_METADATA_DATABASE);
+ BUILT_IN_FIELD_MAP.put(MetaFieldType.TABLE.getName(), BuiltInField.MYSQL_METADATA_TABLE);
+ BUILT_IN_FIELD_MAP.put(MetaFieldType.EVENT_TIME.getName(), BuiltInField.MYSQL_METADATA_EVENT_TIME);
+ BUILT_IN_FIELD_MAP.put(MetaFieldType.IS_DDL.getName(), BuiltInField.MYSQL_METADATA_IS_DDL);
+ BUILT_IN_FIELD_MAP.put(MetaFieldType.EVENT_TYPE.getName(), BuiltInField.MYSQL_METADATA_EVENT_TYPE);
}
/**
@@ -78,7 +79,8 @@ public class FieldInfoUtils {
boolean duplicate = false;
for (SinkFieldResponse field : fieldList) {
// If the field name equals to build-in field, new a build-in field info
- FieldInfo sourceFieldInfo = getFieldInfo(field.getSourceFieldName(), field.getSourceFieldType());
+ FieldInfo sourceFieldInfo = getFieldInfo(field.getSourceFieldName(),
+ field.getSourceFieldType(), field.getIsSourceMetaField() == 1);
sourceFields.add(sourceFieldInfo);
// Get sink field info
@@ -86,7 +88,8 @@ public class FieldInfoUtils {
if (sinkFieldName.equals(partitionField)) {
duplicate = true;
}
- FieldInfo sinkFieldInfo = getFieldInfo(field.getFieldName(), field.getFieldType());
+ FieldInfo sinkFieldInfo = getSinkFieldInfo(field.getFieldName(), field.getFieldType(),
+ field.getSourceFieldName(), field.getIsSourceMetaField() == 1);
sinkFields.add(sinkFieldInfo);
fieldMappingUnitList.add(new FieldMappingUnit(sourceFieldInfo, sinkFieldInfo));
@@ -108,15 +111,41 @@ public class FieldInfoUtils {
* @apiNote If the field name equals to build-in field, new a build-in field info
*/
private static FieldInfo getFieldInfo(String fieldName, String fieldType) {
+ return getFieldInfo(fieldName, fieldType, false);
+ }
+
+ /**
+ * Get field info by the given field name ant type.
+ *
+ * @apiNote If the field name equals to build-in field, new a build-in field info
+ */
+ private static FieldInfo getFieldInfo(String fieldName, String fieldType, boolean isBuiltin) {
FieldInfo fieldInfo;
BuiltInField builtInField = BUILT_IN_FIELD_MAP.get(fieldName);
FormatInfo formatInfo = convertFieldFormat(fieldType.toLowerCase());
- if (builtInField == null) {
- fieldInfo = new FieldInfo(fieldName, formatInfo);
- } else {
+ if (isBuiltin && builtInField != null) {
fieldInfo = new BuiltInFieldInfo(fieldName, formatInfo, builtInField);
+ } else {
+ fieldInfo = new FieldInfo(fieldName, formatInfo);
}
+ return fieldInfo;
+ }
+ /**
+ * Get field info by the given field name ant type.
+ *
+ * @apiNote If the field name equals to build-in field, new a build-in field info
+ */
+ private static FieldInfo getSinkFieldInfo(String fieldName, String fieldType,
+ String sourceFieldName, boolean isBuiltin) {
+ FieldInfo fieldInfo;
+ BuiltInField builtInField = BUILT_IN_FIELD_MAP.get(sourceFieldName);
+ FormatInfo formatInfo = convertFieldFormat(fieldType.toLowerCase());
+ if (isBuiltin && builtInField != null) {
+ fieldInfo = new BuiltInFieldInfo(fieldName, formatInfo, builtInField);
+ } else {
+ fieldInfo = new FieldInfo(fieldName, formatInfo);
+ }
return fieldInfo;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java
index d0da64e..cea3110 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java
@@ -27,6 +27,7 @@ import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceResponse;
import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.sort.protocol.deserialization.AvroDeserializationInfo;
+import org.apache.inlong.sort.protocol.deserialization.CanalDeserializationInfo;
import org.apache.inlong.sort.protocol.deserialization.CsvDeserializationInfo;
import org.apache.inlong.sort.protocol.deserialization.DebeziumDeserializationInfo;
import org.apache.inlong.sort.protocol.deserialization.DeserializationInfo;
@@ -97,6 +98,9 @@ public class SerializationUtils {
return new AvroDeserializationInfo();
case JSON:
return new JsonDeserializationInfo();
+ case CANAL:
+ return new CanalDeserializationInfo(source.getDatabasePattern(), source.getTablePattern(),
+ source.isIgnoreParseErrors(), source.getTimestampFormatStandard(), true);
default:
throw new IllegalArgumentException(
String.format("Unsupported serializationType for Kafka source: %s", serializationType));
diff --git a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
index b83f493..00272a6 100644
--- a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
@@ -641,6 +641,7 @@ CREATE TABLE `stream_sink_field`
`sink_type` varchar(15) NOT NULL COMMENT 'Sink type',
`source_field_name` varchar(50) DEFAULT NULL COMMENT 'Source field name',
`source_field_type` varchar(50) DEFAULT NULL COMMENT 'Source field type',
+ `is_source_meta_field` int(3) DEFAULT '0' COMMENT 'source field is meta field',
`field_name` varchar(50) NOT NULL COMMENT 'Field name',
`field_type` varchar(50) NOT NULL COMMENT 'Field type',
`field_comment` varchar(2000) DEFAULT NULL COMMENT 'Field description',
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index e3746c1..e41a53b 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -673,6 +673,7 @@ CREATE TABLE `stream_sink_field`
`sink_type` varchar(15) NOT NULL COMMENT 'Sink type',
`source_field_name` varchar(50) DEFAULT NULL COMMENT 'Source field name',
`source_field_type` varchar(50) DEFAULT NULL COMMENT 'Source field type',
+ `is_source_meta_field` int(3) DEFAULT '0' COMMENT 'source field is meta field',
`field_name` varchar(50) NOT NULL COMMENT 'Field name',
`field_type` varchar(50) NOT NULL COMMENT 'Field type',
`field_comment` varchar(2000) DEFAULT NULL COMMENT 'Field description',