You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/15 06:58:21 UTC
[incubator-inlong] branch master updated: [INLONG-3130][Sort] Support extract specified metadata from input data with canal format (#3139)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 95887dd [INLONG-3130][Sort] Support extract specified metadata from input data with canal format (#3139)
95887dd is described below
commit 95887dd6e424b777ce44ec8181b0fb935d193e27
Author: TianqiWan <52...@users.noreply.github.com>
AuthorDate: Tue Mar 15 14:58:15 2022 +0800
[INLONG-3130][Sort] Support extract specified metadata from input data with canal format (#3139)
---
.../deserialization/CanalDeserializationInfo.java | 1 +
.../json/canal/CanalJsonDecodingFormat.java | 8 +-
.../json/canal/CanalJsonDeserializationSchema.java | 40 +++---
.../inlong/sort/formats/json/canal/CanalUtils.java | 40 ++++++
.../inlong/sort/singletenant/flink/Entrance.java | 6 +-
.../CanalDeserializationSchemaBuilder.java | 144 +++++++++++----------
.../DebeziumDeserializationSchemaBuilder.java | 11 +-
.../DeserializationSchemaFactory.java | 2 +-
.../deserialization/FieldMappingTransformer.java | 6 +-
.../sort/singletenant/flink/utils/CommonUtils.java | 12 ++
.../deserialization/CanalDeserializationTest.java | 103 ++++++++-------
.../DeserializationFunctionTest.java | 92 +++++++++++++
12 files changed, 317 insertions(+), 148 deletions(-)
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CanalDeserializationInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CanalDeserializationInfo.java
index a6ac92c..175cd8a 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CanalDeserializationInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CanalDeserializationInfo.java
@@ -42,6 +42,7 @@ public class CanalDeserializationInfo implements DeserializationInfo {
@JsonProperty("timestamp_format_standard")
private final String timestampFormatStandard;
+ @Deprecated
@JsonProperty("include_metadata")
private final boolean includeMetadata;
diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonDecodingFormat.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonDecodingFormat.java
index 343f4e9..c6ac39c 100644
--- a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonDecodingFormat.java
+++ b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonDecodingFormat.java
@@ -231,7 +231,7 @@ public class CanalJsonDecodingFormat implements DecodingFormat<DeserializationSc
EVENT_TIMESTAMP(
"event-timestamp",
- DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
+ DataTypes.BIGINT().nullable(),
DataTypes.FIELD("es", DataTypes.BIGINT()),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@@ -241,7 +241,7 @@ public class CanalJsonDecodingFormat implements DecodingFormat<DeserializationSc
if (row.isNullAt(pos)) {
return null;
}
- return TimestampData.fromEpochMillis(row.getLong(pos));
+ return row.getLong(pos);
}
@Override
@@ -289,5 +289,9 @@ public class CanalJsonDecodingFormat implements DecodingFormat<DeserializationSc
this.requiredJsonField = requiredJsonField;
this.converter = converter;
}
+
+ public String getKey() {
+ return key;
+ }
}
}
diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonDeserializationSchema.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonDeserializationSchema.java
index 089debc..c0012ab 100644
--- a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonDeserializationSchema.java
+++ b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonDeserializationSchema.java
@@ -19,10 +19,13 @@
package org.apache.inlong.sort.formats.json.canal;
import static java.lang.String.format;
+import static org.apache.inlong.sort.formats.json.canal.CanalUtils.getMysqlMetadataKey;
import java.io.IOException;
import java.io.Serializable;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -35,8 +38,10 @@ import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.DataTypeUtils;
@@ -67,12 +72,11 @@ public final class CanalJsonDeserializationSchema implements DeserializationSche
/** The deserializer to deserialize Canal JSON data. */
private final JsonRowDataDeserializationSchema jsonDeserializer;
- /** Flag that indicates that an additional projection is required for metadata. */
- private final boolean hasMetadata;
-
/** Metadata to be extracted for every record. */
private final MetadataConverter[] metadataConverters;
+ private final List<ReadableMetadata> requestedMetadata;
+
/** {@link TypeInformation} of the produced {@link RowData} (physical + meta data). */
private final TypeInformation<RowData> producedTypeInfo;
@@ -116,8 +120,8 @@ public final class CanalJsonDeserializationSchema implements DeserializationSche
// failOnMissingField
ignoreParseErrors,
timestampFormat);
- this.hasMetadata = requestedMetadata.size() > 0;
this.metadataConverters = createMetadataConverters(jsonRowType, requestedMetadata);
+ this.requestedMetadata = requestedMetadata;
this.producedTypeInfo = producedTypeInfo;
this.database = database;
this.table = table;
@@ -284,24 +288,26 @@ public final class CanalJsonDeserializationSchema implements DeserializationSche
}
}
- private void emitRow(
- GenericRowData rootRow, GenericRowData physicalRow, Collector<RowData> out) {
- // shortcut in case no output projection is required
- if (!hasMetadata) {
- out.collect(physicalRow);
- return;
- }
+ private void emitRow(GenericRowData rootRow, GenericRowData physicalRow, Collector<RowData> out) {
final int physicalArity = physicalRow.getArity();
final int metadataArity = metadataConverters.length;
- final GenericRowData producedRow =
- new GenericRowData(physicalRow.getRowKind(), physicalArity + metadataArity);
+ final GenericRowData producedRow = new GenericRowData(physicalRow.getRowKind(), physicalArity + 1);
+
for (int physicalPos = 0; physicalPos < physicalArity; physicalPos++) {
- producedRow.setField(physicalPos, physicalRow.getField(physicalPos));
+ producedRow.setField(physicalPos + 1, physicalRow.getField(physicalPos));
}
+
+ // Put metadata in the first field of the emitted RowData
+ Map<StringData, StringData> metadataMap = new HashMap<>();
+
for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) {
- producedRow.setField(
- physicalArity + metadataPos, metadataConverters[metadataPos].convert(rootRow));
+ metadataMap.put(
+ StringData.fromString(getMysqlMetadataKey(requestedMetadata.get(metadataPos))),
+ StringData.fromString(metadataConverters[metadataPos].convert(rootRow).toString())
+ );
}
+ producedRow.setField(0, new GenericMapData(metadataMap));
+
out.collect(producedRow);
}
@@ -325,7 +331,6 @@ public final class CanalJsonDeserializationSchema implements DeserializationSche
}
CanalJsonDeserializationSchema that = (CanalJsonDeserializationSchema) o;
return Objects.equals(jsonDeserializer, that.jsonDeserializer)
- && hasMetadata == that.hasMetadata
&& Objects.equals(producedTypeInfo, that.producedTypeInfo)
&& Objects.equals(database, that.database)
&& Objects.equals(table, that.table)
@@ -337,7 +342,6 @@ public final class CanalJsonDeserializationSchema implements DeserializationSche
public int hashCode() {
return Objects.hash(
jsonDeserializer,
- hasMetadata,
producedTypeInfo,
database,
table,
diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalUtils.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalUtils.java
new file mode 100644
index 0000000..3c9ce61
--- /dev/null
+++ b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalUtils.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.json.canal;
+
+import org.apache.inlong.sort.formats.json.MysqlBinLogData;
+import org.apache.inlong.sort.formats.json.canal.CanalJsonDecodingFormat.ReadableMetadata;
+
+public class CanalUtils {
+
+ public static String getMysqlMetadataKey(ReadableMetadata readableMetadata) {
+ switch (readableMetadata) {
+ case DATABASE:
+ return MysqlBinLogData.MYSQL_METADATA_DATABASE;
+ case TABLE:
+ return MysqlBinLogData.MYSQL_METADATA_TABLE;
+ case IS_DDL:
+ return MysqlBinLogData.MYSQL_METADATA_IS_DDL;
+ case EVENT_TIMESTAMP:
+ return MysqlBinLogData.MYSQL_METADATA_EVENT_TIME;
+ default:
+ throw new IllegalArgumentException("Not supported yet");
+ }
+ }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
index ad42a82..5c78a26 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
@@ -41,6 +41,7 @@ import org.apache.inlong.sort.flink.hive.HiveCommitter;
import org.apache.inlong.sort.flink.hive.HiveWriter;
import org.apache.inlong.sort.protocol.DataFlowInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.deserialization.CanalDeserializationInfo;
import org.apache.inlong.sort.protocol.deserialization.DebeziumDeserializationInfo;
import org.apache.inlong.sort.protocol.sink.ClickHouseSinkInfo;
import org.apache.inlong.sort.protocol.sink.HiveSinkInfo;
@@ -155,10 +156,13 @@ public class Entrance {
sourceFields, sourceInfo.getDeserializationInfo());
FieldMappingTransformer fieldMappingTransformer = new FieldMappingTransformer(config, sourceFields);
+ // Currently, canal and debezium deserialization schema will put a map at the first position
+ // of the deserialized row. So the `appendAttributes` flag should be set false.
DeserializationFunction function = new DeserializationFunction(
schema,
fieldMappingTransformer,
- !(sourceInfo.getDeserializationInfo() instanceof DebeziumDeserializationInfo));
+ !(sourceInfo.getDeserializationInfo() instanceof DebeziumDeserializationInfo)
+ && !(sourceInfo.getDeserializationInfo() instanceof CanalDeserializationInfo));
DataStream<Row> deserializedStream = sourceStream.process(function)
.uid(Constants.DESERIALIZATION_SCHEMA_UID)
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/CanalDeserializationSchemaBuilder.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/CanalDeserializationSchemaBuilder.java
index 7ca93a5..57c0b7a 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/CanalDeserializationSchemaBuilder.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/CanalDeserializationSchemaBuilder.java
@@ -17,53 +17,59 @@
package org.apache.inlong.sort.singletenant.flink.deserialization;
+import static org.apache.flink.table.types.utils.DataTypeUtils.validateInputDataType;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertDateToStringFormatInfo;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertFieldInfosToDataType;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.extractFormatInfos;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.getProducedFieldInfos;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.getTimestampFormatStandard;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.common.TimestampFormat;
-import org.apache.flink.formats.json.canal.CanalJsonDecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
-import org.apache.inlong.sort.formats.common.ArrayFormatInfo;
-import org.apache.inlong.sort.formats.common.IntFormatInfo;
-import org.apache.inlong.sort.formats.common.LocalZonedTimestampFormatInfo;
-import org.apache.inlong.sort.formats.common.MapFormatInfo;
+import org.apache.inlong.sort.formats.common.BooleanFormatInfo;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.json.canal.CanalJsonDecodingFormat;
+import org.apache.inlong.sort.formats.json.canal.CanalJsonDecodingFormat.ReadableMetadata;
+import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
+import org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.deserialization.CanalDeserializationInfo;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.apache.flink.table.types.utils.DataTypeUtils.validateInputDataType;
-import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertDateToStringFormatInfo;
-import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertFieldInfosToDataType;
-import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.extractFormatInfos;
-import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.getTimestampFormatStandard;
+import org.apache.inlong.sort.singletenant.flink.utils.CommonUtils;
public class CanalDeserializationSchemaBuilder {
- private static final List<String> ALL_SUPPORTED_METADATA_KEYS =
- Arrays.asList("database", "table", "sql-type", "pk-names", "ingestion-timestamp", "event-timestamp");
-
public static DeserializationSchema<Row> build(
FieldInfo[] fieldInfos,
CanalDeserializationInfo deserializationInfo
) throws IOException, ClassNotFoundException {
String timestampFormatStandard = deserializationInfo.getTimestampFormatStandard();
- boolean includeMetadata = deserializationInfo.isIncludeMetadata();
- CanalJsonDecodingFormat canalJsonDecodingFormat = createCanalJsonDecodingFormat(
+ CanalJsonDecodingFormat canalJsonDecodingFormat = new CanalJsonDecodingFormat(
deserializationInfo.getDatabase(),
deserializationInfo.getTable(),
deserializationInfo.isIgnoreParseErrors(),
- timestampFormatStandard,
- includeMetadata
+ getTimestampFormatStandard(timestampFormatStandard)
);
- FieldInfo[] convertedInputFields = convertDateToStringFormatInfo(fieldInfos);
+ // Extract required metadata
+ FieldInfo[] metadataFieldInfos = getMetadataFieldInfos(fieldInfos);
+ List<String> requiredMetadataKeys = Arrays.stream(metadataFieldInfos)
+ .map(FieldInfo::getName)
+ .collect(Collectors.toList());
+ canalJsonDecodingFormat.applyReadableMetadata(requiredMetadataKeys);
+
+ FieldInfo[] originPhysicalFieldInfos = CommonUtils.extractNonBuiltInFieldInfos(fieldInfos, false);
+ FieldInfo[] convertedPhysicalFieldInfos = convertDateToStringFormatInfo(originPhysicalFieldInfos);
DeserializationSchema<RowData> canalSchema = canalJsonDecodingFormat.createRuntimeDecoder(
new DynamicTableSource.Context() {
@Override
@@ -77,61 +83,59 @@ public class CanalDeserializationSchemaBuilder {
return null;
}
},
- convertFieldInfosToDataType(convertedInputFields)
+ convertFieldInfosToDataType(convertedPhysicalFieldInfos)
);
- return wrapCanalDeserializationSchema(canalSchema, includeMetadata, fieldInfos, timestampFormatStandard);
+ return wrapCanalDeserializationSchema(canalSchema, originPhysicalFieldInfos, convertedPhysicalFieldInfos);
}
private static DeserializationSchema<Row> wrapCanalDeserializationSchema(
DeserializationSchema<RowData> canalSchema,
- boolean includeMetadata,
- FieldInfo[] origFieldInfos,
- String timestampFormatStandard
- ) throws IOException, ClassNotFoundException {
- FieldInfo[] allFields;
- if (includeMetadata) {
- allFields = new FieldInfo[origFieldInfos.length + ALL_SUPPORTED_METADATA_KEYS.size()];
- System.arraycopy(origFieldInfos, 0, allFields, 0, origFieldInfos.length);
- FieldInfo[] metadataFields = buildMetadataFields(timestampFormatStandard);
- System.arraycopy(metadataFields, 0, allFields, origFieldInfos.length, metadataFields.length);
- } else {
- allFields = origFieldInfos;
- }
-
- FieldInfo[] convertedAllFields = convertDateToStringFormatInfo(allFields);
- RowDataToRowDeserializationSchemaWrapper rowDataToRowSchema =
- new RowDataToRowDeserializationSchemaWrapper(canalSchema, convertedAllFields);
- return new CustomDateFormatDeserializationSchemaWrapper(rowDataToRowSchema, extractFormatInfos(allFields));
- }
-
- private static CanalJsonDecodingFormat createCanalJsonDecodingFormat(
- String database,
- String table,
- boolean ignoreParseErrors,
- String timestampFormatStandard,
- boolean includeMetadata
+ FieldInfo[] originPhysicalFieldInfos,
+ FieldInfo[] convertedPhysicalFieldInfos
) {
- TimestampFormat timestampFormat = getTimestampFormatStandard(timestampFormatStandard);
- CanalJsonDecodingFormat canalJsonDecodingFormat =
- new CanalJsonDecodingFormat(database, table, ignoreParseErrors, timestampFormat);
- if (includeMetadata) {
- canalJsonDecodingFormat.applyReadableMetadata(ALL_SUPPORTED_METADATA_KEYS);
- }
- return canalJsonDecodingFormat;
+ RowDataToRowDeserializationSchemaWrapper rowDataToRowSchema = new RowDataToRowDeserializationSchemaWrapper(
+ canalSchema,
+ getProducedFieldInfos(convertedPhysicalFieldInfos));
+ return new CustomDateFormatDeserializationSchemaWrapper(
+ rowDataToRowSchema,
+ extractFormatInfos(getProducedFieldInfos(originPhysicalFieldInfos)));
}
- private static FieldInfo[] buildMetadataFields(String timestampStandard) {
- return new FieldInfo[]{
- new FieldInfo(ALL_SUPPORTED_METADATA_KEYS.get(0), StringFormatInfo.INSTANCE),
- new FieldInfo(ALL_SUPPORTED_METADATA_KEYS.get(1), StringFormatInfo.INSTANCE),
- new FieldInfo(ALL_SUPPORTED_METADATA_KEYS.get(2),
- new MapFormatInfo(StringFormatInfo.INSTANCE, IntFormatInfo.INSTANCE)),
- new FieldInfo(ALL_SUPPORTED_METADATA_KEYS.get(3), new ArrayFormatInfo(StringFormatInfo.INSTANCE)),
- new FieldInfo(ALL_SUPPORTED_METADATA_KEYS.get(4), new LocalZonedTimestampFormatInfo(timestampStandard)),
- new FieldInfo(ALL_SUPPORTED_METADATA_KEYS.get(5), new LocalZonedTimestampFormatInfo(timestampStandard))
- };
- }
+ public static FieldInfo[] getMetadataFieldInfos(FieldInfo[] fieldInfos) {
+ List<FieldInfo> metadataFieldInfos = new ArrayList<>();
+ Arrays.stream(fieldInfos)
+ .filter(fieldInfo -> fieldInfo instanceof BuiltInFieldInfo)
+ .forEach(fieldInfo -> {
+ BuiltInFieldInfo builtInFieldInfo = (BuiltInFieldInfo) fieldInfo;
+ BuiltInField builtInField = builtInFieldInfo.getBuiltInField();
+ switch (builtInField) {
+ case MYSQL_METADATA_DATABASE:
+ metadataFieldInfos.add(new FieldInfo(
+ ReadableMetadata.DATABASE.getKey(), StringFormatInfo.INSTANCE));
+ break;
+ case MYSQL_METADATA_TABLE:
+ metadataFieldInfos.add(new FieldInfo(
+ ReadableMetadata.TABLE.getKey(), StringFormatInfo.INSTANCE));
+ break;
+ case MYSQL_METADATA_EVENT_TIME:
+ metadataFieldInfos.add(new FieldInfo(
+ ReadableMetadata.EVENT_TIMESTAMP.getKey(), LongFormatInfo.INSTANCE));
+ break;
+ case MYSQL_METADATA_IS_DDL:
+ metadataFieldInfos.add(new FieldInfo(
+ ReadableMetadata.IS_DDL.getKey(), BooleanFormatInfo.INSTANCE));
+ break;
+ case MYSQL_METADATA_EVENT_TYPE:
+ case MYSQL_METADATA_DATA:
+ break;
+ default:
+ throw new IllegalArgumentException(
+ "Unsupported builtin field '" + builtInField + "' in debezium deserialization");
+ }
+ });
+ return metadataFieldInfos.toArray(new FieldInfo[0]);
+ }
}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DebeziumDeserializationSchemaBuilder.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DebeziumDeserializationSchemaBuilder.java
index 8027822..a2b64a3 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DebeziumDeserializationSchemaBuilder.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DebeziumDeserializationSchemaBuilder.java
@@ -23,6 +23,7 @@ import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.checkW
import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertDateToStringFormatInfo;
import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertFieldInfosToDataType;
import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.extractFormatInfos;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.getProducedFieldInfos;
import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.getTimestampFormatStandard;
import java.io.IOException;
@@ -39,7 +40,6 @@ import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.inlong.sort.formats.common.LongFormatInfo;
-import org.apache.inlong.sort.formats.common.MapFormatInfo;
import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.apache.inlong.sort.formats.json.debezium.DebeziumJsonDecodingFormat;
import org.apache.inlong.sort.formats.json.debezium.DebeziumJsonDecodingFormat.ReadableMetadata;
@@ -129,13 +129,4 @@ public class DebeziumDeserializationSchemaBuilder {
return metadataFieldInfos.toArray(new FieldInfo[0]);
}
-
- public static FieldInfo[] getProducedFieldInfos(FieldInfo[] physicalFieldInfos) {
- List<FieldInfo> results = new ArrayList<>();
- results.add(new FieldInfo(
- "metadata",
- new MapFormatInfo(StringFormatInfo.INSTANCE, StringFormatInfo.INSTANCE)));
- results.addAll(Arrays.asList(physicalFieldInfos));
- return results.toArray(new FieldInfo[0]);
- }
}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationSchemaFactory.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationSchemaFactory.java
index 1e71b0a..85ce0d3 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationSchemaFactory.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationSchemaFactory.java
@@ -51,7 +51,7 @@ public class DeserializationSchemaFactory {
return buildAvroDeserializationSchema(extractNonBuiltInFieldInfos(fieldInfos, false));
} else if (deserializationInfo instanceof CanalDeserializationInfo) {
return CanalDeserializationSchemaBuilder.build(
- extractNonBuiltInFieldInfos(fieldInfos, false),
+ fieldInfos,
(CanalDeserializationInfo) deserializationInfo);
} else if (deserializationInfo instanceof DebeziumDeserializationInfo) {
return DebeziumDeserializationSchemaBuilder.build(
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/FieldMappingTransformer.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/FieldMappingTransformer.java
index 9eb71b0..e827a8b 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/FieldMappingTransformer.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/FieldMappingTransformer.java
@@ -98,9 +98,11 @@ public class FieldMappingTransformer implements Serializable {
case MYSQL_METADATA_TABLE:
return attributes.get(MysqlBinLogData.MYSQL_METADATA_TABLE);
case MYSQL_METADATA_IS_DDL:
- return BooleanFormatInfo.INSTANCE.deserialize(attributes.get(MysqlBinLogData.MYSQL_METADATA_IS_DDL));
+ String isDdlStr = attributes.get(MysqlBinLogData.MYSQL_METADATA_IS_DDL);
+ return isDdlStr == null ? null : BooleanFormatInfo.INSTANCE.deserialize(isDdlStr);
case MYSQL_METADATA_EVENT_TIME:
- return LongFormatInfo.INSTANCE.deserialize(attributes.get(MysqlBinLogData.MYSQL_METADATA_EVENT_TIME));
+ String eventTimeStr = attributes.get(MysqlBinLogData.MYSQL_METADATA_EVENT_TIME);
+ return eventTimeStr == null ? null : LongFormatInfo.INSTANCE.deserialize(eventTimeStr);
case MYSQL_METADATA_EVENT_TYPE:
return kind.shortString();
case MYSQL_METADATA_DATA:
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtils.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtils.java
index d169fbc..eac8154 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtils.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtils.java
@@ -18,7 +18,9 @@
package org.apache.inlong.sort.singletenant.flink.utils;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
import org.apache.avro.Schema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.common.TimestampFormat;
@@ -31,6 +33,7 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.inlong.sort.formats.base.TableFormatUtils;
import org.apache.inlong.sort.formats.common.DateFormatInfo;
import org.apache.inlong.sort.formats.common.FormatInfo;
+import org.apache.inlong.sort.formats.common.MapFormatInfo;
import org.apache.inlong.sort.formats.common.RowFormatInfo;
import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.apache.inlong.sort.formats.common.TimeFormatInfo;
@@ -214,4 +217,13 @@ public class CommonUtils {
return false;
}
+ public static FieldInfo[] getProducedFieldInfos(FieldInfo[] physicalFieldInfos) {
+ List<FieldInfo> results = new ArrayList<>();
+ results.add(new FieldInfo(
+ "metadata",
+ new MapFormatInfo(StringFormatInfo.INSTANCE, StringFormatInfo.INSTANCE)));
+ results.addAll(Arrays.asList(physicalFieldInfos));
+ return results.toArray(new FieldInfo[0]);
+ }
+
}
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/CanalDeserializationTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/CanalDeserializationTest.java
index b649ef0..b46d1df 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/CanalDeserializationTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/CanalDeserializationTest.java
@@ -18,10 +18,21 @@
package org.apache.inlong.sort.singletenant.flink.deserialization;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.table.data.TimestampData;
import org.apache.flink.types.Row;
import org.apache.inlong.sort.formats.common.BinaryFormatInfo;
+import org.apache.inlong.sort.formats.common.BooleanFormatInfo;
import org.apache.inlong.sort.formats.common.DateFormatInfo;
import org.apache.inlong.sort.formats.common.FloatFormatInfo;
import org.apache.inlong.sort.formats.common.IntFormatInfo;
@@ -30,23 +41,13 @@ import org.apache.inlong.sort.formats.common.MapFormatInfo;
import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.apache.inlong.sort.formats.common.TimeFormatInfo;
import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.formats.json.MysqlBinLogData;
+import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
+import org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.deserialization.CanalDeserializationInfo;
import org.junit.Test;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.time.LocalDateTime;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
public class CanalDeserializationTest {
private final FieldInfo[] fieldInfos = new FieldInfo[]{
@@ -62,27 +63,40 @@ public class CanalDeserializationTest {
new MapFormatInfo(StringFormatInfo.INSTANCE, IntFormatInfo.INSTANCE)))
};
+ private final FieldInfo[] fieldInfosWithBuiltinField = new FieldInfo[]{
+ new FieldInfo("id", IntFormatInfo.INSTANCE),
+ new FieldInfo("name", StringFormatInfo.INSTANCE),
+ new FieldInfo("description", StringFormatInfo.INSTANCE),
+ new FieldInfo("weight", FloatFormatInfo.INSTANCE),
+ new BuiltInFieldInfo("database", StringFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_DATABASE),
+ new BuiltInFieldInfo("table", StringFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_TABLE),
+ new BuiltInFieldInfo(
+ "event-timestamp", LongFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_EVENT_TIME),
+ new BuiltInFieldInfo("is-ddl", BooleanFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_IS_DDL),
+ };
+
private Row generateTestRow() {
- Row testRow = new Row(8);
- testRow.setField(0, 1238123899121L);
- testRow.setField(1, "testName");
+ Row testRow = new Row(9);
+ testRow.setField(0, new HashMap<>());
+ testRow.setField(1, 1238123899121L);
+ testRow.setField(2, "testName");
byte[] bytes = new byte[]{1, 2, 3, 4, 5, 6};
- testRow.setField(2, bytes);
+ testRow.setField(3, bytes);
- testRow.setField(3, Date.valueOf("1990-10-14"));
- testRow.setField(4, Time.valueOf("12:12:43"));
- testRow.setField(5, Timestamp.valueOf("1990-10-14 12:12:43"));
+ testRow.setField(4, Date.valueOf("1990-10-14"));
+ testRow.setField(5, Time.valueOf("12:12:43"));
+ testRow.setField(6, Timestamp.valueOf("1990-10-14 12:12:43"));
Map<String, Long> map = new HashMap<>();
map.put("flink", 123L);
- testRow.setField(6, map);
+ testRow.setField(7, map);
Map<String, Map<String, Integer>> nestedMap = new HashMap<>();
Map<String, Integer> innerMap = new HashMap<>();
innerMap.put("key", 234);
nestedMap.put("inner_map", innerMap);
- testRow.setField(7, nestedMap);
+ testRow.setField(8, nestedMap);
return testRow;
}
@@ -170,40 +184,41 @@ public class CanalDeserializationTest {
+ "}";
byte[] testBytes = testCanalData.getBytes(StandardCharsets.UTF_8);
- FieldInfo[] fieldInfos = new FieldInfo[]{
- new FieldInfo("id", IntFormatInfo.INSTANCE),
- new FieldInfo("name", StringFormatInfo.INSTANCE),
- new FieldInfo("description", StringFormatInfo.INSTANCE),
- new FieldInfo("weight", FloatFormatInfo.INSTANCE)
- };
-
DeserializationSchema<Row> schemaWithoutFilter = DeserializationSchemaFactory.build(
- fieldInfos,
+ fieldInfosWithBuiltinField,
new CanalDeserializationInfo(null, null, false, "ISO_8601", true)
);
ListCollector<Row> collector1 = new ListCollector<>();
schemaWithoutFilter.deserialize(testBytes, collector1);
List<Row> innerList = collector1.getInnerList();
assertEquals(3, innerList.size());
- Row row = innerList.get(0);
- assertEquals(101, row.getField(0));
- assertEquals("scooter", row.getField(1).toString());
- assertEquals("Small 2-wheel scooter", row.getField(2).toString());
- assertEquals(3.14f, (Float) row.getField(3), 0);
- assertEquals("inventory", row.getField(4).toString());
- assertEquals("products2", row.getField(5).toString());
- assertEquals(4, ((Map<?, ?>) row.getField(6)).size());
- assertEquals("id", ((String[]) row.getField(7))[0]);
- // "es" and "ts" are treated as local in flink-canal
- assertEquals(1589373515477L, TimestampData.fromLocalDateTime((LocalDateTime) row.getField(8)).getMillisecond());
- assertEquals(1589373515000L, TimestampData.fromLocalDateTime((LocalDateTime) row.getField(9)).getMillisecond());
+ assertEquals(generateTestRowWithMetadata(), innerList.get(0));
DeserializationSchema<Row> schemaWithFilter = DeserializationSchemaFactory.build(
- fieldInfos,
+ fieldInfosWithBuiltinField,
new CanalDeserializationInfo("NoExistDB", null, false, "ISO_8601", true)
);
ListCollector<Row> collector2 = new ListCollector<>();
schemaWithFilter.deserialize(testBytes, collector2);
assertTrue(collector2.getInnerList().isEmpty());
}
+
+ private Row generateTestRowWithMetadata() {
+ Row testRow = new Row(5);
+ testRow.setField(0, new HashMap<String, String>() {
+ {
+ put(MysqlBinLogData.MYSQL_METADATA_DATABASE, "inventory");
+ put(MysqlBinLogData.MYSQL_METADATA_TABLE, "products2");
+ put(MysqlBinLogData.MYSQL_METADATA_EVENT_TIME, "1589373515000");
+ put(MysqlBinLogData.MYSQL_METADATA_IS_DDL, "false");
+ }
+ });
+
+ testRow.setField(1, 101);
+ testRow.setField(2, "scooter");
+ testRow.setField(3, "Small 2-wheel scooter");
+ testRow.setField(4, 3.14f);
+
+ return testRow;
+ }
}
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationFunctionTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationFunctionTest.java
index a83c42b..e334df9 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationFunctionTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationFunctionTest.java
@@ -20,8 +20,15 @@ package org.apache.inlong.sort.singletenant.flink.deserialization;
import org.apache.flink.types.Row;
import org.apache.inlong.common.msg.InLongMsg;
import org.apache.inlong.sort.configuration.Configuration;
+import org.apache.inlong.sort.formats.common.BooleanFormatInfo;
+import org.apache.inlong.sort.formats.common.FloatFormatInfo;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
+import org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField;
import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.deserialization.CanalDeserializationInfo;
import org.apache.inlong.sort.singletenant.flink.SerializedRecord;
import org.junit.Test;
@@ -51,4 +58,89 @@ public class DeserializationFunctionTest {
assertEquals(testData, row.getField(0));
}
+ @Test
+ public void testDeserializeCanalJson() throws Exception {
+ String testCanalData = "{\n"
+ + " \"data\":[\n"
+ + " {\n"
+ + " \"id\":\"101\",\n"
+ + " \"name\":\"scooter\",\n"
+ + " \"description\":\"Small 2-wheel scooter\",\n"
+ + " \"weight\":\"3.14\"\n"
+ + " },\n"
+ + " {\n"
+ + " \"id\":\"102\",\n"
+ + " \"name\":\"car battery\",\n"
+ + " \"description\":\"12V car battery\",\n"
+ + " \"weight\":\"8.1\"\n"
+ + " },\n"
+ + " {\n"
+ + " \"id\":\"103\",\n"
+ + " \"name\":\"12-pack drill bits\",\n"
+ + " \"description\":\"12-pack of drill bits with sizes ranging from #40 to #3\",\n"
+ + " \"weight\":\"0.8\"\n"
+ + " }\n"
+ + " ],\n"
+ + " \"database\":\"database\",\n"
+ + " \"es\":1589373515000,\n"
+ + " \"id\":3,\n"
+ + " \"isDdl\":false,\n"
+ + " \"mysqlType\":{\n"
+ + " \"id\":\"INTEGER\",\n"
+ + " \"name\":\"VARCHAR(255)\",\n"
+ + " \"description\":\"VARCHAR(512)\",\n"
+ + " \"weight\":\"FLOAT\"\n"
+ + " },\n"
+ + " \"old\":null,\n"
+ + " \"pkNames\":[\n"
+ + " \"id\"\n"
+ + " ],\n"
+ + " \"sql\":\"\",\n"
+ + " \"sqlType\":{\n"
+ + " \"id\":4,\n"
+ + " \"name\":12,\n"
+ + " \"description\":12,\n"
+ + " \"weight\":7\n"
+ + " },\n"
+ + " \"table\":\"table\",\n"
+ + " \"ts\":1589373515477,\n"
+ + " \"type\":\"INSERT\"\n"
+ + "}";
+
+ InLongMsg inLongMsg = InLongMsg.newInLongMsg(true);
+ String attrs = "m=0"
+ + "&dt=" + System.currentTimeMillis()
+ + "&iname=" + "tid";
+ inLongMsg.addMsg(attrs, testCanalData.getBytes());
+ SerializedRecord serializedRecord = new SerializedRecord(1, inLongMsg.buildArray());
+
+ FieldInfo[] fieldInfos = new FieldInfo[]{
+ new FieldInfo("id", IntFormatInfo.INSTANCE),
+ new FieldInfo("name", StringFormatInfo.INSTANCE),
+ new FieldInfo("description", StringFormatInfo.INSTANCE),
+ new FieldInfo("weight", FloatFormatInfo.INSTANCE),
+ new BuiltInFieldInfo("database", StringFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_DATABASE),
+ new BuiltInFieldInfo("table", StringFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_TABLE),
+ new BuiltInFieldInfo(
+ "event-timestamp", LongFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_EVENT_TIME),
+ new BuiltInFieldInfo("is-ddl", BooleanFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_IS_DDL),
+ new BuiltInFieldInfo("is-ddl", BooleanFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_EVENT_TYPE)
+ };
+
+ DeserializationFunction function = new DeserializationFunction(
+ DeserializationSchemaFactory.build(
+ fieldInfos,
+ new CanalDeserializationInfo(null, null, false, "ISO_8601", false)),
+ new FieldMappingTransformer(new Configuration(), fieldInfos),
+ false);
+
+ ListCollector<Row> collector = new ListCollector<>();
+ function.processElement(serializedRecord,null, collector);
+ Row row = collector.getInnerList().get(0);
+
+ Row expected = Row.of(
+ 101, "scooter", "Small 2-wheel scooter", 3.14f, "database", "table", 1589373515000L, false, "+I");
+ assertEquals(expected, row);
+ }
+
}