You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/01 05:56:13 UTC
[incubator-inlong] 01/01: Revert "[INLONG-2785][Sort]Support extract metadata from data with debezium format and write them to data with canal format (#2796)"
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch revert-2796-master_debezium_to_canal
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit 5db5df127429cfdc0b90c80b9f600ee1fb003589
Author: healchow <he...@gmail.com>
AuthorDate: Tue Mar 1 13:56:10 2022 +0800
Revert "[INLONG-2785][Sort]Support extract metadata from data with debezium format and write them to data with canal format (#2796)"
This reverts commit 265f129e0bcd26ffe92322876089293f86a44610.
---
.../inlong/sort/protocol/BuiltInFieldInfo.java | 7 +-
inlong-sort/sort-formats/format-json/pom.xml | 78 ----
.../inlong/sort/formats/json/MysqlBinLogData.java | 60 ---
.../json/canal/CanalJsonDecodingFormat.java | 293 ---------------
.../json/canal/CanalJsonDeserializationSchema.java | 411 ---------------------
.../json/canal/CanalJsonSerializationSchema.java | 195 ----------
.../json/debezium/DebeziumJsonDecodingFormat.java | 286 --------------
.../DebeziumJsonDeserializationSchema.java | 366 ------------------
.../sort/formats/json/debezium/DebeziumUtils.java | 37 --
inlong-sort/sort-formats/pom.xml | 1 -
inlong-sort/sort-single-tenant/pom.xml | 11 +-
.../inlong/sort/singletenant/flink/Entrance.java | 16 +-
.../DebeziumDeserializationSchemaBuilder.java | 93 +----
.../deserialization/DeserializationFunction.java | 17 +-
.../DeserializationSchemaFactory.java | 14 +-
.../deserialization/FieldMappingTransformer.java | 30 +-
.../CanalSerializationSchemaBuilder.java | 70 +---
.../sort/singletenant/flink/utils/CommonUtils.java | 8 -
.../singletenant/flink/DebeziumToCanalITCase.java | 208 -----------
.../DebeziumDeserializationTest.java | 111 +++---
.../FieldMappingTransformerTest.java | 3 +-
.../serialization/CanalSerializationTest.java | 83 -----
22 files changed, 103 insertions(+), 2295 deletions(-)
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/BuiltInFieldInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/BuiltInFieldInfo.java
index 6a25d62..45291fa 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/BuiltInFieldInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/BuiltInFieldInfo.java
@@ -27,12 +27,7 @@ import org.apache.inlong.sort.formats.common.FormatInfo;
public class BuiltInFieldInfo extends FieldInfo {
public enum BuiltInField {
- DATA_TIME,
- MYSQL_METADATA_DATABASE,
- MYSQL_METADATA_TABLE,
- MYSQL_METADATA_EVENT_TIME,
- MYSQL_METADATA_IS_DDL,
- MYSQL_METADATA_EVENT_TYPE
+ DATA_TIME
}
private static final long serialVersionUID = -3436204467879205139L;
diff --git a/inlong-sort/sort-formats/format-json/pom.xml b/inlong-sort/sort-formats/format-json/pom.xml
deleted file mode 100644
index ab4db83..0000000
--- a/inlong-sort/sort-formats/format-json/pom.xml
+++ /dev/null
@@ -1,78 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
--->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <artifactId>sort-formats</artifactId>
- <groupId>org.apache.inlong</groupId>
- <version>1.1.0-incubating-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>sort-format-json</artifactId>
- <name>Apache InLong - Sort Format-json</name>
- <packaging>jar</packaging>
-
- <dependencies>
-
- <dependency>
- <groupId>org.apache.inlong</groupId>
- <artifactId>sort-format-common</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-shaded-jackson</artifactId>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-json</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-core</artifactId>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-common</artifactId>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-runtime-blink_${flink.scala.binary.version}</artifactId>
- <scope>provided</scope>
- </dependency>
-
- </dependencies>
-
-</project>
\ No newline at end of file
diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/MysqlBinLogData.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/MysqlBinLogData.java
deleted file mode 100644
index eaaa9ea..0000000
--- a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/MysqlBinLogData.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.formats.json;
-
-import java.io.Serializable;
-import java.util.Map;
-import org.apache.flink.table.data.RowData;
-
-public class MysqlBinLogData implements Serializable {
-
- public static final String MYSQL_METADATA_DATABASE = "mysql_metadata_database";
-
- public static final String MYSQL_METADATA_TABLE = "mysql_metadata_table";
-
- public static final String MYSQL_METADATA_EVENT_TIME = "mysql_metadata_event_time";
-
- public static final String MYSQL_METADATA_IS_DDL = "mysql_metadata_is_ddl";
-
- public static final String MYSQL_METADATA_EVENT_TYPE = "mysql_metadata_event_type";
-
- private RowData physicalData;
-
- private Map<String, Object> metadataMap;
-
- public MysqlBinLogData(RowData physicalData, Map<String, Object> metadataMap) {
- this.physicalData = physicalData;
- this.metadataMap = metadataMap;
- }
-
- public RowData getPhysicalData() {
- return physicalData;
- }
-
- public void setPhysicalData(RowData physicalData) {
- this.physicalData = physicalData;
- }
-
- public Map<String, Object> getMetadataMap() {
- return metadataMap;
- }
-
- public void setMetadataMap(Map<String, Object> metadataMap) {
- this.metadataMap = metadataMap;
- }
-}
diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonDecodingFormat.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonDecodingFormat.java
deleted file mode 100644
index 343f4e9..0000000
--- a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonDecodingFormat.java
+++ /dev/null
@@ -1,293 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.formats.json.canal;
-
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import javax.annotation.Nullable;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.common.TimestampFormat;
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.format.DecodingFormat;
-import org.apache.flink.table.connector.source.DynamicTableSource;
-import org.apache.flink.table.data.GenericArrayData;
-import org.apache.flink.table.data.GenericMapData;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.utils.DataTypeUtils;
-import org.apache.flink.types.RowKind;
-import org.apache.inlong.sort.formats.json.canal.CanalJsonDeserializationSchema.MetadataConverter;
-
-/** {@link DecodingFormat} for Canal using JSON encoding. */
-public class CanalJsonDecodingFormat implements DecodingFormat<DeserializationSchema<RowData>> {
-
- // --------------------------------------------------------------------------------------------
- // Mutable attributes
- // --------------------------------------------------------------------------------------------
-
- private List<String> metadataKeys;
-
- // --------------------------------------------------------------------------------------------
- // Canal-specific attributes
- // --------------------------------------------------------------------------------------------
-
- private final @Nullable String database;
-
- private final @Nullable String table;
-
- private final boolean ignoreParseErrors;
-
- private final TimestampFormat timestampFormat;
-
- public CanalJsonDecodingFormat(
- String database,
- String table,
- boolean ignoreParseErrors,
- TimestampFormat timestampFormat) {
- this.database = database;
- this.table = table;
- this.ignoreParseErrors = ignoreParseErrors;
- this.timestampFormat = timestampFormat;
- this.metadataKeys = Collections.emptyList();
- }
-
- @Override
- public DeserializationSchema<RowData> createRuntimeDecoder(
- DynamicTableSource.Context context, DataType physicalDataType) {
- final List<ReadableMetadata> readableMetadata =
- metadataKeys.stream()
- .map(
- k ->
- Stream.of(ReadableMetadata.values())
- .filter(rm -> rm.key.equals(k))
- .findFirst()
- .orElseThrow(IllegalStateException::new))
- .collect(Collectors.toList());
- final List<DataTypes.Field> metadataFields =
- readableMetadata.stream()
- .map(m -> DataTypes.FIELD(m.key, m.dataType))
- .collect(Collectors.toList());
- final DataType producedDataType =
- DataTypeUtils.appendRowFields(physicalDataType, metadataFields);
- final TypeInformation<RowData> producedTypeInfo =
- context.createTypeInformation(producedDataType);
- return CanalJsonDeserializationSchema.builder(
- physicalDataType, readableMetadata, producedTypeInfo)
- .setDatabase(database)
- .setTable(table)
- .setIgnoreParseErrors(ignoreParseErrors)
- .setTimestampFormat(timestampFormat)
- .build();
- }
-
- @Override
- public Map<String, DataType> listReadableMetadata() {
- final Map<String, DataType> metadataMap = new LinkedHashMap<>();
- Stream.of(ReadableMetadata.values())
- .forEachOrdered(m -> metadataMap.put(m.key, m.dataType));
- return metadataMap;
- }
-
- @Override
- public void applyReadableMetadata(List<String> metadataKeys) {
- this.metadataKeys = metadataKeys;
- }
-
- @Override
- public ChangelogMode getChangelogMode() {
- return ChangelogMode.newBuilder()
- .addContainedKind(RowKind.INSERT)
- .addContainedKind(RowKind.UPDATE_BEFORE)
- .addContainedKind(RowKind.UPDATE_AFTER)
- .addContainedKind(RowKind.DELETE)
- .build();
- }
-
- // --------------------------------------------------------------------------------------------
- // Metadata handling
- // --------------------------------------------------------------------------------------------
-
- /** List of metadata that can be read with this format. */
- public enum ReadableMetadata {
- DATABASE(
- "database",
- DataTypes.STRING().nullable(),
- DataTypes.FIELD("database", DataTypes.STRING()),
- new MetadataConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(GenericRowData row, int pos) {
- return row.getString(pos);
- }
-
- @Override
- public Object convert(Object in) {
- return StringData.fromString(in.toString());
- }
- }),
-
- TABLE(
- "table",
- DataTypes.STRING().nullable(),
- DataTypes.FIELD("table", DataTypes.STRING()),
- new MetadataConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(GenericRowData row, int pos) {
- return row.getString(pos);
- }
-
- @Override
- public Object convert(Object in) {
- return StringData.fromString(in.toString());
- }
- }),
-
- SQL_TYPE(
- "sql-type",
- DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.INT().nullable()).nullable(),
- DataTypes.FIELD(
- "sqlType",
- DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.INT().nullable())),
- new MetadataConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(GenericRowData row, int pos) {
- return row.getMap(pos);
- }
-
- @Override
- public Object convert(Object in) {
- return new GenericMapData((Map<String, Integer>) in);
- }
- }),
-
- PK_NAMES(
- "pk-names",
- DataTypes.ARRAY(DataTypes.STRING()).nullable(),
- DataTypes.FIELD("pkNames", DataTypes.ARRAY(DataTypes.STRING())),
- new MetadataConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(GenericRowData row, int pos) {
- return row.getArray(pos);
- }
-
- @Override
- public Object convert(Object in) {
- return new GenericArrayData((Object[]) in);
- }
- }),
-
- INGESTION_TIMESTAMP(
- "ingestion-timestamp",
- DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
- DataTypes.FIELD("ts", DataTypes.BIGINT()),
- new MetadataConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(GenericRowData row, int pos) {
- if (row.isNullAt(pos)) {
- return null;
- }
- return TimestampData.fromEpochMillis(row.getLong(pos));
- }
-
- @Override
- public Object convert(Object in) {
- return in;
- }
- }),
-
- EVENT_TIMESTAMP(
- "event-timestamp",
- DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
- DataTypes.FIELD("es", DataTypes.BIGINT()),
- new MetadataConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(GenericRowData row, int pos) {
- if (row.isNullAt(pos)) {
- return null;
- }
- return TimestampData.fromEpochMillis(row.getLong(pos));
- }
-
- @Override
- public Object convert(Object in) {
- return in;
- }
- }),
-
- IS_DDL(
- "is-ddl",
- DataTypes.BOOLEAN().nullable(),
- DataTypes.FIELD("isDdl", DataTypes.BOOLEAN()),
- new MetadataConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(GenericRowData row, int pos) {
- if (row.isNullAt(pos)) {
- return null;
- }
- return row.getBoolean(pos);
- }
-
- @Override
- public Object convert(Object in) {
- return in;
- }
- });
-
- final String key;
-
- final DataType dataType;
-
- final DataTypes.Field requiredJsonField;
-
- final MetadataConverter converter;
-
- ReadableMetadata(
- String key,
- DataType dataType,
- DataTypes.Field requiredJsonField,
- MetadataConverter converter) {
- this.key = key;
- this.dataType = dataType;
- this.requiredJsonField = requiredJsonField;
- this.converter = converter;
- }
- }
-}
diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonDeserializationSchema.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonDeserializationSchema.java
deleted file mode 100644
index 089debc..0000000
--- a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonDeserializationSchema.java
+++ /dev/null
@@ -1,411 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.formats.json.canal;
-
-import static java.lang.String.format;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-import java.util.Objects;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-import javax.annotation.Nullable;
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.common.TimestampFormat;
-import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.data.ArrayData;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.utils.DataTypeUtils;
-import org.apache.flink.types.RowKind;
-import org.apache.flink.util.Collector;
-import org.apache.inlong.sort.formats.json.canal.CanalJsonDecodingFormat.ReadableMetadata;
-
-/**
- * Deserialization schema from Canal JSON to Flink Table/SQL internal data structure {@link
- * RowData}. The deserialization schema knows Canal's schema definition and can extract the database
- * data and convert into {@link RowData} with {@link RowKind}.
- *
- * <p>Deserializes a <code>byte[]</code> message as a JSON object and reads the specified fields.
- *
- * <p>Failures during deserialization are forwarded as wrapped IOExceptions.
- *
- * @see <a href="https://github.com/alibaba/canal">Alibaba Canal</a>
- */
-public final class CanalJsonDeserializationSchema implements DeserializationSchema<RowData> {
- private static final long serialVersionUID = 1L;
-
- private static final String FIELD_OLD = "old";
- private static final String OP_INSERT = "INSERT";
- private static final String OP_UPDATE = "UPDATE";
- private static final String OP_DELETE = "DELETE";
- private static final String OP_CREATE = "CREATE";
-
- /** The deserializer to deserialize Canal JSON data. */
- private final JsonRowDataDeserializationSchema jsonDeserializer;
-
- /** Flag that indicates that an additional projection is required for metadata. */
- private final boolean hasMetadata;
-
- /** Metadata to be extracted for every record. */
- private final MetadataConverter[] metadataConverters;
-
- /** {@link TypeInformation} of the produced {@link RowData} (physical + meta data). */
- private final TypeInformation<RowData> producedTypeInfo;
-
- /** Only read changelogs from the specific database. */
- private final @Nullable String database;
-
- /** Only read changelogs from the specific table. */
- private final @Nullable String table;
-
- /** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */
- private final boolean ignoreParseErrors;
-
- /** Names of fields. */
- private final List<String> fieldNames;
-
- /** Number of fields. */
- private final int fieldCount;
-
- /** Pattern of the specific database. */
- private final Pattern databasePattern;
-
- /** Pattern of the specific table. */
- private final Pattern tablePattern;
-
- private CanalJsonDeserializationSchema(
- DataType physicalDataType,
- List<ReadableMetadata> requestedMetadata,
- TypeInformation<RowData> producedTypeInfo,
- @Nullable String database,
- @Nullable String table,
- boolean ignoreParseErrors,
- TimestampFormat timestampFormat) {
- final RowType jsonRowType = createJsonRowType(physicalDataType, requestedMetadata);
- this.jsonDeserializer =
- new JsonRowDataDeserializationSchema(
- jsonRowType,
- // the result type is never used, so it's fine to pass in the produced type
- // info
- producedTypeInfo,
- false, // ignoreParseErrors already contains the functionality of
- // failOnMissingField
- ignoreParseErrors,
- timestampFormat);
- this.hasMetadata = requestedMetadata.size() > 0;
- this.metadataConverters = createMetadataConverters(jsonRowType, requestedMetadata);
- this.producedTypeInfo = producedTypeInfo;
- this.database = database;
- this.table = table;
- this.ignoreParseErrors = ignoreParseErrors;
- final RowType physicalRowType = ((RowType) physicalDataType.getLogicalType());
- this.fieldNames = physicalRowType.getFieldNames();
- this.fieldCount = physicalRowType.getFieldCount();
- this.databasePattern = database == null ? null : Pattern.compile(database);
- this.tablePattern = table == null ? null : Pattern.compile(table);
- }
-
- // ------------------------------------------------------------------------------------------
- // Builder
- // ------------------------------------------------------------------------------------------
-
- /** Creates A builder for building a {@link CanalJsonDeserializationSchema}. */
- public static Builder builder(
- DataType physicalDataType,
- List<ReadableMetadata> requestedMetadata,
- TypeInformation<RowData> producedTypeInfo) {
- return new Builder(physicalDataType, requestedMetadata, producedTypeInfo);
- }
-
- /** A builder for creating a {@link CanalJsonDeserializationSchema}. */
- @Internal
- public static final class Builder {
- private final DataType physicalDataType;
- private final List<ReadableMetadata> requestedMetadata;
- private final TypeInformation<RowData> producedTypeInfo;
- private String database = null;
- private String table = null;
- private boolean ignoreParseErrors = false;
- private TimestampFormat timestampFormat = TimestampFormat.SQL;
-
- private Builder(
- DataType physicalDataType,
- List<ReadableMetadata> requestedMetadata,
- TypeInformation<RowData> producedTypeInfo) {
- this.physicalDataType = physicalDataType;
- this.requestedMetadata = requestedMetadata;
- this.producedTypeInfo = producedTypeInfo;
- }
-
- public Builder setDatabase(String database) {
- this.database = database;
- return this;
- }
-
- public Builder setTable(String table) {
- this.table = table;
- return this;
- }
-
- public Builder setIgnoreParseErrors(boolean ignoreParseErrors) {
- this.ignoreParseErrors = ignoreParseErrors;
- return this;
- }
-
- public Builder setTimestampFormat(TimestampFormat timestampFormat) {
- this.timestampFormat = timestampFormat;
- return this;
- }
-
- public CanalJsonDeserializationSchema build() {
- return new CanalJsonDeserializationSchema(
- physicalDataType,
- requestedMetadata,
- producedTypeInfo,
- database,
- table,
- ignoreParseErrors,
- timestampFormat);
- }
- }
-
- // ------------------------------------------------------------------------------------------
-
- @Override
- public RowData deserialize(byte[] message) throws IOException {
- throw new RuntimeException(
- "Please invoke DeserializationSchema#deserialize(byte[], Collector<RowData>) instead.");
- }
-
- @Override
- public void deserialize(@Nullable byte[] message, Collector<RowData> out) throws IOException {
- if (message == null || message.length == 0) {
- return;
- }
- try {
- final JsonNode root = jsonDeserializer.deserializeToJsonNode(message);
- if (database != null) {
- if (!databasePattern
- .matcher(root.get(ReadableMetadata.DATABASE.key).asText())
- .matches()) {
- return;
- }
- }
- if (table != null) {
- if (!tablePattern
- .matcher(root.get(ReadableMetadata.TABLE.key).asText())
- .matches()) {
- return;
- }
- }
- final GenericRowData row = (GenericRowData) jsonDeserializer.convertToRowData(root);
- String type = row.getString(2).toString(); // "type" field
- if (OP_INSERT.equals(type)) {
- // "data" field is an array of row, contains inserted rows
- ArrayData data = row.getArray(0);
- for (int i = 0; i < data.size(); i++) {
- GenericRowData insert = (GenericRowData) data.getRow(i, fieldCount);
- insert.setRowKind(RowKind.INSERT);
- emitRow(row, insert, out);
- }
- } else if (OP_UPDATE.equals(type)) {
- // "data" field is an array of row, contains new rows
- ArrayData data = row.getArray(0);
- // "old" field is an array of row, contains old values
- ArrayData old = row.getArray(1);
- for (int i = 0; i < data.size(); i++) {
- // the underlying JSON deserialization schema always produce GenericRowData.
- GenericRowData after = (GenericRowData) data.getRow(i, fieldCount);
- GenericRowData before = (GenericRowData) old.getRow(i, fieldCount);
- final JsonNode oldField = root.get(FIELD_OLD);
- for (int f = 0; f < fieldCount; f++) {
- if (before.isNullAt(f) && oldField.findValue(fieldNames.get(f)) == null) {
- // fields in "old" (before) means the fields are changed
- // fields not in "old" (before) means the fields are not changed
- // so we just copy the not changed fields into before
- before.setField(f, after.getField(f));
- }
- }
- before.setRowKind(RowKind.UPDATE_BEFORE);
- after.setRowKind(RowKind.UPDATE_AFTER);
- emitRow(row, before, out);
- emitRow(row, after, out);
- }
- } else if (OP_DELETE.equals(type)) {
- // "data" field is an array of row, contains deleted rows
- ArrayData data = row.getArray(0);
- for (int i = 0; i < data.size(); i++) {
- GenericRowData insert = (GenericRowData) data.getRow(i, fieldCount);
- insert.setRowKind(RowKind.DELETE);
- emitRow(row, insert, out);
- }
- } else if (OP_CREATE.equals(type)) {
- // "data" field is null and "type" is "CREATE" which means
- // this is a DDL change event, and we should skip it.
- return;
- } else {
- if (!ignoreParseErrors) {
- throw new IOException(
- format(
- "Unknown \"type\" value \"%s\". The Canal JSON message is '%s'",
- type, new String(message)));
- }
- }
- } catch (Throwable t) {
- // a big try catch to protect the processing.
- if (!ignoreParseErrors) {
- throw new IOException(
- format("Corrupt Canal JSON message '%s'.", new String(message)), t);
- }
- }
- }
-
- private void emitRow(
- GenericRowData rootRow, GenericRowData physicalRow, Collector<RowData> out) {
- // shortcut in case no output projection is required
- if (!hasMetadata) {
- out.collect(physicalRow);
- return;
- }
- final int physicalArity = physicalRow.getArity();
- final int metadataArity = metadataConverters.length;
- final GenericRowData producedRow =
- new GenericRowData(physicalRow.getRowKind(), physicalArity + metadataArity);
- for (int physicalPos = 0; physicalPos < physicalArity; physicalPos++) {
- producedRow.setField(physicalPos, physicalRow.getField(physicalPos));
- }
- for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) {
- producedRow.setField(
- physicalArity + metadataPos, metadataConverters[metadataPos].convert(rootRow));
- }
- out.collect(producedRow);
- }
-
- @Override
- public boolean isEndOfStream(RowData nextElement) {
- return false;
- }
-
- @Override
- public TypeInformation<RowData> getProducedType() {
- return producedTypeInfo;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- CanalJsonDeserializationSchema that = (CanalJsonDeserializationSchema) o;
- return Objects.equals(jsonDeserializer, that.jsonDeserializer)
- && hasMetadata == that.hasMetadata
- && Objects.equals(producedTypeInfo, that.producedTypeInfo)
- && Objects.equals(database, that.database)
- && Objects.equals(table, that.table)
- && ignoreParseErrors == that.ignoreParseErrors
- && fieldCount == that.fieldCount;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(
- jsonDeserializer,
- hasMetadata,
- producedTypeInfo,
- database,
- table,
- ignoreParseErrors,
- fieldCount);
- }
-
- // --------------------------------------------------------------------------------------------
-
- private static RowType createJsonRowType(
- DataType physicalDataType, List<ReadableMetadata> readableMetadata) {
- // Canal JSON contains other information, e.g. "ts", "sql", but we don't need them
- DataType root =
- DataTypes.ROW(
- DataTypes.FIELD("data", DataTypes.ARRAY(physicalDataType)),
- DataTypes.FIELD("old", DataTypes.ARRAY(physicalDataType)),
- DataTypes.FIELD("type", DataTypes.STRING()),
- ReadableMetadata.DATABASE.requiredJsonField,
- ReadableMetadata.TABLE.requiredJsonField);
- // append fields that are required for reading metadata in the root
- final List<DataTypes.Field> rootMetadataFields =
- readableMetadata.stream()
- .filter(m -> m != ReadableMetadata.DATABASE && m != ReadableMetadata.TABLE)
- .map(m -> m.requiredJsonField)
- .distinct()
- .collect(Collectors.toList());
- return (RowType) DataTypeUtils.appendRowFields(root, rootMetadataFields).getLogicalType();
- }
-
- private static MetadataConverter[] createMetadataConverters(
- RowType jsonRowType, List<ReadableMetadata> requestedMetadata) {
- return requestedMetadata.stream()
- .map(m -> convert(jsonRowType, m))
- .toArray(MetadataConverter[]::new);
- }
-
- private static MetadataConverter convert(RowType jsonRowType, ReadableMetadata metadata) {
- final int pos = jsonRowType.getFieldNames().indexOf(metadata.requiredJsonField.getName());
- return new MetadataConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(GenericRowData root, int unused) {
- return metadata.converter.convert(root, pos);
- }
-
- @Override
- public Object convert(Object in) {
- return metadata.converter.convert(in);
- }
- };
- }
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Converter that extracts a metadata field from the row that comes out of the JSON schema and
- * converts it to the desired data type.
- */
- interface MetadataConverter extends Serializable {
-
- // Method for top-level access.
- default Object convert(GenericRowData row) {
- return convert(row, -1);
- }
-
- Object convert(GenericRowData row, int pos);
-
- Object convert(Object in);
- }
-}
diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonSerializationSchema.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonSerializationSchema.java
deleted file mode 100644
index faf1cba..0000000
--- a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonSerializationSchema.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.formats.json.canal;
-
-import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.stream.Collectors;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.formats.common.TimestampFormat;
-import org.apache.flink.formats.json.JsonOptions;
-import org.apache.flink.formats.json.JsonRowDataSerializationSchema;
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.DataTypes.Field;
-import org.apache.flink.table.data.ArrayData;
-import org.apache.flink.table.data.GenericArrayData;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.data.util.DataFormatConverters;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.utils.DataTypeUtils;
-import org.apache.flink.types.Row;
-import org.apache.flink.types.RowKind;
-import org.apache.inlong.sort.formats.json.MysqlBinLogData;
-import org.apache.inlong.sort.formats.json.canal.CanalJsonDecodingFormat.ReadableMetadata;
-
-/**
- * Copied from apache flink project with a litter change.
- *
- * Serialization schema that serializes an object of Flink Table/SQL internal data structure {@link
- * RowData} into a Canal JSON bytes.
- *
- * @see <a href="https://github.com/alibaba/canal">Alibaba Canal</a>
- */
-public class CanalJsonSerializationSchema implements SerializationSchema<RowData> {
-
- private static final long serialVersionUID = 1L;
-
- private static final StringData OP_INSERT = StringData.fromString("INSERT");
- private static final StringData OP_DELETE = StringData.fromString("DELETE");
-
- private transient GenericRowData reuse;
-
- /** The serializer to serialize Canal JSON data. */
- private final JsonRowDataSerializationSchema jsonSerializer;
-
- private final DataFormatConverters.RowConverter consumedRowConverter;
-
- private final DataFormatConverters.RowConverter physicalRowConverter;
-
- private final Map<Integer, ReadableMetadata> fieldIndexToMetadata;
-
- public CanalJsonSerializationSchema(
- RowType physicalRowType,
- Map<Integer, ReadableMetadata> fieldIndexToMetadata,
- DataFormatConverters.RowConverter consumedRowConverter,
- DataFormatConverters.RowConverter physicalRowConverter,
- TimestampFormat timestampFormat,
- JsonOptions.MapNullKeyMode mapNullKeyMode,
- String mapNullKeyLiteral,
- boolean encodeDecimalAsPlainNumber) {
- jsonSerializer =
- new JsonRowDataSerializationSchema(
- createJsonRowType(fromLogicalToDataType(physicalRowType), fieldIndexToMetadata.values()),
- timestampFormat,
- mapNullKeyMode,
- mapNullKeyLiteral,
- encodeDecimalAsPlainNumber);
-
- this.fieldIndexToMetadata = fieldIndexToMetadata;
- this.consumedRowConverter = consumedRowConverter;
- this.physicalRowConverter = physicalRowConverter;
- }
-
- @Override
- public void open(InitializationContext context) {
- reuse = new GenericRowData(2 + fieldIndexToMetadata.size());
- }
-
- @Override
- public byte[] serialize(RowData row) {
- try {
- MysqlBinLogData mysqlBinLogData = getMysqlBinLongData(row);
-
- ArrayData arrayData = new GenericArrayData(new RowData[] {mysqlBinLogData.getPhysicalData()});
- reuse.setField(0, arrayData);
- reuse.setField(1, rowKind2String(row.getRowKind()));
-
- // Set metadata
- Map<String, Object> metadataMap = mysqlBinLogData.getMetadataMap();
- int index = 2;
- for (ReadableMetadata readableMetadata : fieldIndexToMetadata.values()) {
- reuse.setField(index, metadataMap.get(readableMetadata.key));
- index++;
- }
-
- return jsonSerializer.serialize(reuse);
- } catch (Throwable t) {
- throw new RuntimeException("Could not serialize row '" + row + "'.", t);
- }
- }
-
- private StringData rowKind2String(RowKind rowKind) {
- switch (rowKind) {
- case INSERT:
- case UPDATE_AFTER:
- return OP_INSERT;
- case UPDATE_BEFORE:
- case DELETE:
- return OP_DELETE;
- default:
- throw new UnsupportedOperationException(
- "Unsupported operation '" + rowKind + "' for row kind.");
- }
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- CanalJsonSerializationSchema that = (CanalJsonSerializationSchema) o;
- return Objects.equals(jsonSerializer, that.jsonSerializer);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(jsonSerializer);
- }
-
- private static RowType createJsonRowType(
- DataType dataSchema,
- Collection<ReadableMetadata> metadataSet) {
- DataType root = DataTypes.ROW(
- DataTypes.FIELD("data", DataTypes.ARRAY(dataSchema)),
- DataTypes.FIELD("type", DataTypes.STRING())
- );
-
- final List<Field> metadataFields =
- metadataSet.stream()
- .map(m -> m.requiredJsonField)
- .distinct()
- .collect(Collectors.toList());
-
- return (RowType) DataTypeUtils.appendRowFields(root, metadataFields).getLogicalType();
- }
-
- private MysqlBinLogData getMysqlBinLongData(RowData consumedRowData) {
- Row consumedRow = consumedRowConverter.toExternal(consumedRowData);
- int consumedRowArity = consumedRow.getArity();
- Set<Integer> metadataIndices = fieldIndexToMetadata.keySet();
-
- Row physicalRow = new Row(consumedRowArity - metadataIndices.size());
- Map<String, Object> metadataMap = new HashMap<>();
- int physicalRowDataIndex = 0;
- for (int i = 0; i < consumedRowArity; i++) {
- if (!metadataIndices.contains(i)) {
- physicalRow.setField(physicalRowDataIndex, consumedRow.getField(i));
- physicalRowDataIndex++;
- } else {
- metadataMap.put(
- fieldIndexToMetadata.get(i).key,
- fieldIndexToMetadata.get(i).converter.convert(consumedRow.getField(i)));
- }
- }
-
- return new MysqlBinLogData(physicalRowConverter.toInternal(physicalRow), metadataMap);
- }
-}
diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJsonDecodingFormat.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJsonDecodingFormat.java
deleted file mode 100644
index ca51860..0000000
--- a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJsonDecodingFormat.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.formats.json.debezium;
-
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.common.TimestampFormat;
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.format.DecodingFormat;
-import org.apache.flink.table.connector.source.DynamicTableSource;
-import org.apache.flink.table.data.GenericMapData;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.utils.DataTypeUtils;
-import org.apache.flink.types.RowKind;
-import org.apache.inlong.sort.formats.json.debezium.DebeziumJsonDeserializationSchema.MetadataConverter;
-
-/**
- * Copied from apache flink project with a litter change.
- *
- * {@link DecodingFormat} for Debezium using JSON encoding.
- **/
-public class DebeziumJsonDecodingFormat implements DecodingFormat<DeserializationSchema<RowData>> {
-
- // --------------------------------------------------------------------------------------------
- // Mutable attributes
- // --------------------------------------------------------------------------------------------
-
- private List<String> metadataKeys;
-
- // --------------------------------------------------------------------------------------------
- // Debezium-specific attributes
- // --------------------------------------------------------------------------------------------
-
- private final boolean schemaInclude;
-
- private final boolean ignoreParseErrors;
-
- private final TimestampFormat timestampFormat;
-
- public DebeziumJsonDecodingFormat(
- boolean schemaInclude, boolean ignoreParseErrors, TimestampFormat timestampFormat) {
- this.schemaInclude = schemaInclude;
- this.ignoreParseErrors = ignoreParseErrors;
- this.timestampFormat = timestampFormat;
- this.metadataKeys = Collections.emptyList();
- }
-
- @Override
- public DeserializationSchema<RowData> createRuntimeDecoder(
- DynamicTableSource.Context context, DataType physicalDataType) {
-
- final List<ReadableMetadata> readableMetadata =
- metadataKeys.stream()
- .map(
- k ->
- Stream.of(ReadableMetadata.values())
- .filter(rm -> rm.key.equals(k))
- .findFirst()
- .orElseThrow(IllegalStateException::new))
- .collect(Collectors.toList());
-
- final List<DataTypes.Field> metadataFields =
- readableMetadata.stream()
- .map(m -> DataTypes.FIELD(m.key, m.dataType))
- .collect(Collectors.toList());
-
- final DataType producedDataType =
- DataTypeUtils.appendRowFields(physicalDataType, metadataFields);
-
- final TypeInformation<RowData> producedTypeInfo =
- context.createTypeInformation(producedDataType);
-
- return new DebeziumJsonDeserializationSchema(
- physicalDataType,
- readableMetadata,
- producedTypeInfo,
- schemaInclude,
- ignoreParseErrors,
- timestampFormat);
- }
-
- @Override
- public Map<String, DataType> listReadableMetadata() {
- final Map<String, DataType> metadataMap = new LinkedHashMap<>();
- Stream.of(ReadableMetadata.values())
- .forEachOrdered(m -> metadataMap.put(m.key, m.dataType));
- return metadataMap;
- }
-
- @Override
- public void applyReadableMetadata(List<String> metadataKeys) {
- this.metadataKeys = metadataKeys;
- }
-
- @Override
- public ChangelogMode getChangelogMode() {
- return ChangelogMode.newBuilder()
- .addContainedKind(RowKind.INSERT)
- .addContainedKind(RowKind.UPDATE_BEFORE)
- .addContainedKind(RowKind.UPDATE_AFTER)
- .addContainedKind(RowKind.DELETE)
- .build();
- }
-
- // --------------------------------------------------------------------------------------------
- // Metadata handling
- // --------------------------------------------------------------------------------------------
-
- /** List of metadata that can be read with this format. */
- public enum ReadableMetadata {
- SCHEMA(
- "schema",
- DataTypes.STRING().nullable(),
- false,
- DataTypes.FIELD("schema", DataTypes.STRING()),
- new MetadataConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(GenericRowData row, int pos) {
- return row.getString(pos);
- }
- }),
-
- INGESTION_TIMESTAMP(
- "ingestion-timestamp",
- DataTypes.BIGINT().nullable(),
- true,
- DataTypes.FIELD("ts_ms", DataTypes.BIGINT()),
- new MetadataConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(GenericRowData row, int pos) {
- return row.getLong(pos);
- }
- }),
-
- SOURCE_TIMESTAMP(
- "source.timestamp",
- DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
- true,
- DataTypes.FIELD("source", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
- new MetadataConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(GenericRowData row, int pos) {
- final StringData timestamp =
- (StringData) readProperty(row, pos, KEY_SOURCE_TIMESTAMP);
- if (timestamp == null) {
- return null;
- }
- return TimestampData.fromEpochMillis(Long.parseLong(timestamp.toString()));
- }
- }),
-
- SOURCE_DATABASE(
- "source.database",
- DataTypes.STRING().nullable(),
- true,
- DataTypes.FIELD("source", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
- new MetadataConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(GenericRowData row, int pos) {
- return readProperty(row, pos, KEY_SOURCE_DATABASE);
- }
- }),
-
- SOURCE_SCHEMA(
- "source.schema",
- DataTypes.STRING().nullable(),
- true,
- DataTypes.FIELD("source", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
- new MetadataConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(GenericRowData row, int pos) {
- return readProperty(row, pos, KEY_SOURCE_SCHEMA);
- }
- }),
-
- SOURCE_TABLE(
- "source.table",
- DataTypes.STRING().nullable(),
- true,
- DataTypes.FIELD("source", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
- new MetadataConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(GenericRowData row, int pos) {
- return readProperty(row, pos, KEY_SOURCE_TABLE);
- }
- }),
-
- SOURCE_PROPERTIES(
- "source.properties",
- // key and value of the map are nullable to make handling easier in queries
- DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.STRING().nullable())
- .nullable(),
- true,
- DataTypes.FIELD("source", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
- new MetadataConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(GenericRowData row, int pos) {
- return row.getMap(pos);
- }
- });
-
- final String key;
-
- final DataType dataType;
-
- final boolean isJsonPayload;
-
- final DataTypes.Field requiredJsonField;
-
- final MetadataConverter converter;
-
- ReadableMetadata(
- String key,
- DataType dataType,
- boolean isJsonPayload,
- DataTypes.Field requiredJsonField,
- MetadataConverter converter) {
- this.key = key;
- this.dataType = dataType;
- this.isJsonPayload = isJsonPayload;
- this.requiredJsonField = requiredJsonField;
- this.converter = converter;
- }
-
- public String getKey() {
- return key;
- }
- }
-
- private static final StringData KEY_SOURCE_TIMESTAMP = StringData.fromString("ts_ms");
-
- private static final StringData KEY_SOURCE_DATABASE = StringData.fromString("db");
-
- private static final StringData KEY_SOURCE_SCHEMA = StringData.fromString("schema");
-
- private static final StringData KEY_SOURCE_TABLE = StringData.fromString("table");
-
- private static Object readProperty(GenericRowData row, int pos, StringData key) {
- final GenericMapData map = (GenericMapData) row.getMap(pos);
- if (map == null) {
- return null;
- }
- return map.get(key);
- }
-}
diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJsonDeserializationSchema.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJsonDeserializationSchema.java
deleted file mode 100644
index 8fea979..0000000
--- a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJsonDeserializationSchema.java
+++ /dev/null
@@ -1,366 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.formats.json.debezium;
-
-import static java.lang.String.format;
-import static org.apache.inlong.sort.formats.json.debezium.DebeziumUtils.getMysqlMetadataKey;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.stream.Collectors;
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.common.TimestampFormat;
-import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.data.GenericMapData;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.utils.DataTypeUtils;
-import org.apache.flink.types.RowKind;
-import org.apache.flink.util.Collector;
-import org.apache.inlong.sort.formats.json.MysqlBinLogData;
-import org.apache.inlong.sort.formats.json.debezium.DebeziumJsonDecodingFormat.ReadableMetadata;
-
-/**
- * Copied from apache flink project with a litter change.
- *
- * Deserialization schema from Debezium JSON to Flink Table/SQL internal data structure {@link
- * RowData}. The deserialization schema knows Debezium's schema definition and can extract the
- * database data and convert into {@link RowData} with {@link RowKind}.
- *
- * <p>Deserializes a <code>byte[]</code> message as a JSON object and reads the specified fields.
- *
- * <p>Failures during deserialization are forwarded as wrapped IOExceptions.
- *
- * @see <a href="https://debezium.io/">Debezium</a>
- */
-@Internal
-public final class DebeziumJsonDeserializationSchema implements DeserializationSchema<RowData> {
- private static final long serialVersionUID = 1L;
-
- private static final String OP_READ = "r"; // snapshot read
- private static final String OP_CREATE = "c"; // insert
- private static final String OP_UPDATE = "u"; // update
- private static final String OP_DELETE = "d"; // delete
-
- private static final String REPLICA_IDENTITY_EXCEPTION =
- "The \"before\" field of %s message is null, "
- + "if you are using Debezium Postgres Connector, "
- + "please check the Postgres table has been set REPLICA IDENTITY to FULL level.";
-
- /** The deserializer to deserialize Debezium JSON data. */
- private final JsonRowDataDeserializationSchema jsonDeserializer;
-
- /** Flag that indicates that an additional projection is required for metadata. */
- private final boolean hasMetadata;
-
- /** Metadata to be extracted for every record. */
- private final MetadataConverter[] metadataConverters;
-
- private final List<ReadableMetadata> requestedMetadata;
-
- /** {@link TypeInformation} of the produced {@link RowData} (physical + meta data). */
- private final TypeInformation<RowData> producedTypeInfo;
-
- /**
- * Flag indicating whether the Debezium JSON data contains schema part or not. When Debezium
- * Kafka Connect enables "value.converter.schemas.enable", the JSON will contain "schema"
- * information, but we just ignore "schema" and extract data from "payload".
- */
- private final boolean schemaInclude;
-
- /** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */
- private final boolean ignoreParseErrors;
-
- public DebeziumJsonDeserializationSchema(
- DataType physicalDataType,
- List<ReadableMetadata> requestedMetadata,
- TypeInformation<RowData> producedTypeInfo,
- boolean schemaInclude,
- boolean ignoreParseErrors,
- TimestampFormat timestampFormat) {
- final RowType jsonRowType =
- createJsonRowType(physicalDataType, requestedMetadata, schemaInclude);
- this.jsonDeserializer =
- new JsonRowDataDeserializationSchema(
- jsonRowType,
- // the result type is never used, so it's fine to pass in the produced type
- // info
- producedTypeInfo,
- false, // ignoreParseErrors already contains the functionality of
- // failOnMissingField
- ignoreParseErrors,
- timestampFormat);
- this.hasMetadata = requestedMetadata.size() > 0;
- this.metadataConverters =
- createMetadataConverters(jsonRowType, requestedMetadata, schemaInclude);
- this.requestedMetadata = requestedMetadata;
- this.producedTypeInfo = producedTypeInfo;
- this.schemaInclude = schemaInclude;
- this.ignoreParseErrors = ignoreParseErrors;
- }
-
- @Override
- public RowData deserialize(byte[] message) {
- throw new RuntimeException(
- "Please invoke DeserializationSchema#deserialize(byte[], Collector<RowData>) instead.");
- }
-
- @Override
- public void deserialize(byte[] message, Collector<RowData> out) throws IOException {
- if (message == null || message.length == 0) {
- // skip tombstone messages
- return;
- }
- try {
- GenericRowData row = (GenericRowData) jsonDeserializer.deserialize(message);
- GenericRowData payload;
- if (schemaInclude) {
- payload = (GenericRowData) row.getField(0);
- } else {
- payload = row;
- }
-
- GenericRowData before = (GenericRowData) payload.getField(0);
- GenericRowData after = (GenericRowData) payload.getField(1);
- String op = payload.getField(2).toString();
- if (OP_CREATE.equals(op) || OP_READ.equals(op)) {
- after.setRowKind(RowKind.INSERT);
- emitRow(row, after, out);
- } else if (OP_UPDATE.equals(op)) {
- if (before == null) {
- throw new IllegalStateException(
- String.format(REPLICA_IDENTITY_EXCEPTION, "UPDATE"));
- }
- before.setRowKind(RowKind.UPDATE_BEFORE);
- after.setRowKind(RowKind.UPDATE_AFTER);
- emitRow(row, before, out);
- emitRow(row, after, out);
- } else if (OP_DELETE.equals(op)) {
- if (before == null) {
- throw new IllegalStateException(
- String.format(REPLICA_IDENTITY_EXCEPTION, "DELETE"));
- }
- before.setRowKind(RowKind.DELETE);
- emitRow(row, before, out);
- } else {
- if (!ignoreParseErrors) {
- throw new IOException(
- format(
- "Unknown \"op\" value \"%s\". The Debezium JSON message is '%s'",
- op, new String(message)));
- }
- }
- } catch (Throwable t) {
- // a big try catch to protect the processing.
- if (!ignoreParseErrors) {
- throw new IOException(
- format("Corrupt Debezium JSON message '%s'.", new String(message)), t);
- }
- }
- }
-
- private void emitRow(GenericRowData rootRow, GenericRowData physicalRow, Collector<RowData> out) {
- final int physicalArity = physicalRow.getArity();
- final int metadataArity = metadataConverters.length;
-
- final GenericRowData producedRow = new GenericRowData(physicalRow.getRowKind(), physicalArity + 1);
-
- for (int physicalPos = 0; physicalPos < physicalArity; physicalPos++) {
- producedRow.setField(physicalPos + 1, physicalRow.getField(physicalPos));
- }
-
- // Put metadata in the first field of the emitted RowData
- Map<StringData, StringData> metadataMap = new HashMap<>();
- metadataMap.put(
- StringData.fromString(MysqlBinLogData.MYSQL_METADATA_IS_DDL),
- StringData.fromString("false"));
-
- switch (physicalRow.getRowKind()) {
- case INSERT:
- // fall through
- case UPDATE_AFTER:
- metadataMap.put(
- StringData.fromString(MysqlBinLogData.MYSQL_METADATA_EVENT_TYPE),
- StringData.fromString(OP_CREATE));
- break;
- case UPDATE_BEFORE:
- // fall through
- case DELETE:
- metadataMap.put(
- StringData.fromString(MysqlBinLogData.MYSQL_METADATA_EVENT_TYPE),
- StringData.fromString(OP_DELETE));
- break;
- }
- for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) {
- metadataMap.put(
- StringData.fromString(getMysqlMetadataKey(requestedMetadata.get(metadataPos))),
- StringData.fromString(metadataConverters[metadataPos].convert(rootRow).toString())
- );
- }
- producedRow.setField(0, new GenericMapData(metadataMap));
-
- out.collect(producedRow);
- }
-
- @Override
- public boolean isEndOfStream(RowData nextElement) {
- return false;
- }
-
- @Override
- public TypeInformation<RowData> getProducedType() {
- return producedTypeInfo;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- DebeziumJsonDeserializationSchema that = (DebeziumJsonDeserializationSchema) o;
- return Objects.equals(jsonDeserializer, that.jsonDeserializer)
- && hasMetadata == that.hasMetadata
- && Objects.equals(producedTypeInfo, that.producedTypeInfo)
- && schemaInclude == that.schemaInclude
- && ignoreParseErrors == that.ignoreParseErrors;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(
- jsonDeserializer, hasMetadata, producedTypeInfo, schemaInclude, ignoreParseErrors);
- }
-
- // --------------------------------------------------------------------------------------------
-
- private static RowType createJsonRowType(
- DataType physicalDataType,
- List<ReadableMetadata> readableMetadata,
- boolean schemaInclude) {
- DataType payload =
- DataTypes.ROW(
- DataTypes.FIELD("before", physicalDataType),
- DataTypes.FIELD("after", physicalDataType),
- DataTypes.FIELD("op", DataTypes.STRING()));
-
- // append fields that are required for reading metadata in the payload
- final List<DataTypes.Field> payloadMetadataFields =
- readableMetadata.stream()
- .filter(m -> m.isJsonPayload)
- .map(m -> m.requiredJsonField)
- .distinct()
- .collect(Collectors.toList());
- payload = DataTypeUtils.appendRowFields(payload, payloadMetadataFields);
-
- DataType root = payload;
- if (schemaInclude) {
- // when Debezium Kafka connect enables "value.converter.schemas.enable",
- // the JSON will contain "schema" information and we need to extract data from
- // "payload".
- root = DataTypes.ROW(DataTypes.FIELD("payload", payload));
- }
-
- // append fields that are required for reading metadata in the root
- final List<DataTypes.Field> rootMetadataFields =
- readableMetadata.stream()
- .filter(m -> !m.isJsonPayload)
- .map(m -> m.requiredJsonField)
- .distinct()
- .collect(Collectors.toList());
- root = DataTypeUtils.appendRowFields(root, rootMetadataFields);
-
- return (RowType) root.getLogicalType();
- }
-
- private static MetadataConverter[] createMetadataConverters(
- RowType jsonRowType, List<ReadableMetadata> requestedMetadata, boolean schemaInclude) {
- return requestedMetadata.stream()
- .map(
- m -> {
- if (m.isJsonPayload) {
- return convertInPayload(jsonRowType, m, schemaInclude);
- } else {
- return convertInRoot(jsonRowType, m);
- }
- })
- .toArray(MetadataConverter[]::new);
- }
-
- private static MetadataConverter convertInRoot(RowType jsonRowType, ReadableMetadata metadata) {
- final int pos = findFieldPos(metadata, jsonRowType);
- return new MetadataConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(GenericRowData root, int unused) {
- return metadata.converter.convert(root, pos);
- }
- };
- }
-
- private static MetadataConverter convertInPayload(
- RowType jsonRowType, ReadableMetadata metadata, boolean schemaInclude) {
- if (schemaInclude) {
- final int pos = findFieldPos(metadata, (RowType) jsonRowType.getChildren().get(0));
- return new MetadataConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(GenericRowData root, int unused) {
- final GenericRowData payload = (GenericRowData) root.getField(0);
- return metadata.converter.convert(payload, pos);
- }
- };
- }
- return convertInRoot(jsonRowType, metadata);
- }
-
- private static int findFieldPos(ReadableMetadata metadata, RowType jsonRowType) {
- return jsonRowType.getFieldNames().indexOf(metadata.requiredJsonField.getName());
- }
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Converter that extracts a metadata field from the row (root or payload) that comes out of the
- * JSON schema and converts it to the desired data type.
- */
- interface MetadataConverter extends Serializable {
-
- // Method for top-level access.
- default Object convert(GenericRowData row) {
- return convert(row, -1);
- }
-
- Object convert(GenericRowData row, int pos);
- }
-}
diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumUtils.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumUtils.java
deleted file mode 100644
index f95a3be..0000000
--- a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumUtils.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.formats.json.debezium;
-
-import org.apache.inlong.sort.formats.json.MysqlBinLogData;
-import org.apache.inlong.sort.formats.json.debezium.DebeziumJsonDecodingFormat.ReadableMetadata;
-
-public class DebeziumUtils {
-
- public static String getMysqlMetadataKey(ReadableMetadata readableMetadata) {
- switch (readableMetadata) {
- case SOURCE_DATABASE:
- return MysqlBinLogData.MYSQL_METADATA_DATABASE;
- case SOURCE_TABLE:
- return MysqlBinLogData.MYSQL_METADATA_TABLE;
- case INGESTION_TIMESTAMP:
- return MysqlBinLogData.MYSQL_METADATA_EVENT_TIME;
- default:
- throw new IllegalArgumentException("Not supported yet");
- }
- }
-}
diff --git a/inlong-sort/sort-formats/pom.xml b/inlong-sort/sort-formats/pom.xml
index 4240bd8..2c2f88a 100644
--- a/inlong-sort/sort-formats/pom.xml
+++ b/inlong-sort/sort-formats/pom.xml
@@ -56,7 +56,6 @@
<module>format-kv</module>
<module>format-inlongmsg-base</module>
<module>format-inlongmsg-csv</module>
- <module>format-json</module>
</modules>
<properties>
diff --git a/inlong-sort/sort-single-tenant/pom.xml b/inlong-sort/sort-single-tenant/pom.xml
index 5bcae07..373674b 100644
--- a/inlong-sort/sort-single-tenant/pom.xml
+++ b/inlong-sort/sort-single-tenant/pom.xml
@@ -60,12 +60,6 @@
</dependency>
<dependency>
- <groupId>org.apache.inlong</groupId>
- <artifactId>sort-format-json</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<scope>provided</scope>
@@ -112,6 +106,11 @@
<dependency>
<groupId>org.apache.flink</groupId>
+ <artifactId>flink-json</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
</dependency>
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
index 75bdf71..a557b2d 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
@@ -24,6 +24,7 @@ import static org.apache.inlong.sort.singletenant.flink.pulsar.PulsarSourceBuild
import java.io.File;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Map;
import org.apache.flink.api.common.serialization.DeserializationSchema;
@@ -38,9 +39,9 @@ import org.apache.inlong.sort.configuration.Configuration;
import org.apache.inlong.sort.configuration.Constants;
import org.apache.inlong.sort.flink.hive.HiveCommitter;
import org.apache.inlong.sort.flink.hive.HiveWriter;
+import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
import org.apache.inlong.sort.protocol.DataFlowInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.deserialization.DebeziumDeserializationInfo;
import org.apache.inlong.sort.protocol.sink.ClickHouseSinkInfo;
import org.apache.inlong.sort.protocol.sink.HiveSinkInfo;
import org.apache.inlong.sort.protocol.sink.IcebergSinkInfo;
@@ -126,19 +127,20 @@ public class Entrance {
) throws IOException, ClassNotFoundException {
FieldInfo[] sourceFields = sourceInfo.getFields();
DeserializationSchema<Row> schema = DeserializationSchemaFactory.build(
- sourceFields, sourceInfo.getDeserializationInfo());
+ extractNonBuiltInFieldInfo(sourceFields), sourceInfo.getDeserializationInfo());
FieldMappingTransformer fieldMappingTransformer = new FieldMappingTransformer(config, sourceFields);
-
- DeserializationFunction function = new DeserializationFunction(
- schema,
- fieldMappingTransformer,
- !(sourceInfo.getDeserializationInfo() instanceof DebeziumDeserializationInfo));
+ DeserializationFunction function = new DeserializationFunction(schema, fieldMappingTransformer);
return sourceStream.process(function)
.uid(Constants.DESERIALIZATION_SCHEMA_UID)
.name("Deserialization")
.setParallelism(config.getInteger(Constants.DESERIALIZATION_PARALLELISM));
}
+ private static FieldInfo[] extractNonBuiltInFieldInfo(FieldInfo[] fieldInfos) {
+ return Arrays.stream(fieldInfos).filter(fieldInfo -> !(fieldInfo instanceof BuiltInFieldInfo)).toArray(
+ FieldInfo[]::new);
+ }
+
private static void buildSinkStream(
DataStream<Row> sourceStream,
Configuration config,
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DebeziumDeserializationSchemaBuilder.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DebeziumDeserializationSchemaBuilder.java
index 8f5d177..b2fc1bb 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DebeziumDeserializationSchemaBuilder.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DebeziumDeserializationSchemaBuilder.java
@@ -18,35 +18,25 @@
package org.apache.inlong.sort.singletenant.flink.deserialization;
-import static org.apache.flink.table.types.utils.DataTypeUtils.validateInputDataType;
-import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertDateToStringFormatInfo;
-import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertFieldInfosToDataType;
-import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.extractFormatInfos;
-import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.getTimestampFormatStandard;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.debezium.DebeziumJsonDecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
-import org.apache.inlong.sort.formats.common.LongFormatInfo;
-import org.apache.inlong.sort.formats.common.MapFormatInfo;
-import org.apache.inlong.sort.formats.common.StringFormatInfo;
-import org.apache.inlong.sort.formats.json.debezium.DebeziumJsonDecodingFormat;
-import org.apache.inlong.sort.formats.json.debezium.DebeziumJsonDecodingFormat.ReadableMetadata;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.deserialization.DebeziumDeserializationInfo;
-import org.apache.inlong.sort.singletenant.flink.utils.CommonUtils;
+
+import java.io.IOException;
+
+import static org.apache.flink.table.types.utils.DataTypeUtils.validateInputDataType;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertDateToStringFormatInfo;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertFieldInfosToDataType;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.extractFormatInfos;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.getTimestampFormatStandard;
public class DebeziumDeserializationSchemaBuilder {
@@ -55,18 +45,10 @@ public class DebeziumDeserializationSchemaBuilder {
DebeziumDeserializationInfo deserializationInfo
) throws IOException, ClassNotFoundException {
TimestampFormat timestampFormat = getTimestampFormatStandard(deserializationInfo.getTimestampFormatStandard());
- DebeziumJsonDecodingFormat debeziumJsonDecodingFormat = new DebeziumJsonDecodingFormat(
- false, deserializationInfo.isIgnoreParseErrors(), timestampFormat);
-
- // Extract required metadata
- FieldInfo[] metadataFieldInfos = getMetadataFieldInfos(fieldInfos);
- List<String> requiredMetadataKeys = Arrays.stream(metadataFieldInfos)
- .map(FieldInfo::getName)
- .collect(Collectors.toList());
- debeziumJsonDecodingFormat.applyReadableMetadata(requiredMetadataKeys);
+ DebeziumJsonDecodingFormat debeziumJsonDecodingFormat =
+ new DebeziumJsonDecodingFormat(false, deserializationInfo.isIgnoreParseErrors(), timestampFormat);
- FieldInfo[] originPhysicalFieldInfos = CommonUtils.extractNonBuiltInFieldInfos(fieldInfos);
- FieldInfo[] convertedPhysicalFieldInfos = convertDateToStringFormatInfo(originPhysicalFieldInfos);
+ FieldInfo[] convertedInputFields = convertDateToStringFormatInfo(fieldInfos);
DeserializationSchema<RowData> debeziumSchema = debeziumJsonDecodingFormat.createRuntimeDecoder(
new DynamicTableSource.Context() {
@Override
@@ -80,55 +62,12 @@ public class DebeziumDeserializationSchemaBuilder {
return null;
}
},
- convertFieldInfosToDataType(convertedPhysicalFieldInfos)
+ convertFieldInfosToDataType(convertedInputFields)
);
- RowDataToRowDeserializationSchemaWrapper rowDataToRowSchema = new RowDataToRowDeserializationSchemaWrapper(
- debeziumSchema,
- getProducedFieldInfos(convertedPhysicalFieldInfos));
- return new CustomDateFormatDeserializationSchemaWrapper(
- rowDataToRowSchema,
- extractFormatInfos(getProducedFieldInfos(originPhysicalFieldInfos)));
+ RowDataToRowDeserializationSchemaWrapper rowDataToRowSchema =
+ new RowDataToRowDeserializationSchemaWrapper(debeziumSchema, convertedInputFields);
+ return new CustomDateFormatDeserializationSchemaWrapper(rowDataToRowSchema, extractFormatInfos(fieldInfos));
}
- public static FieldInfo[] getMetadataFieldInfos(FieldInfo[] fieldInfos) {
- List<FieldInfo> metadataFieldInfos = new ArrayList<>();
- Arrays.stream(fieldInfos)
- .filter(fieldInfo -> fieldInfo instanceof BuiltInFieldInfo)
- .forEach(fieldInfo -> {
- BuiltInFieldInfo builtInFieldInfo = (BuiltInFieldInfo) fieldInfo;
- BuiltInField builtInField = builtInFieldInfo.getBuiltInField();
- switch (builtInField) {
- case MYSQL_METADATA_DATABASE:
- metadataFieldInfos.add(new FieldInfo(
- ReadableMetadata.SOURCE_DATABASE.getKey(), StringFormatInfo.INSTANCE));
- break;
- case MYSQL_METADATA_TABLE:
- metadataFieldInfos.add(new FieldInfo(
- ReadableMetadata.SOURCE_TABLE.getKey(), StringFormatInfo.INSTANCE));
- break;
- case MYSQL_METADATA_EVENT_TIME:
- metadataFieldInfos.add(new FieldInfo(
- ReadableMetadata.INGESTION_TIMESTAMP.getKey(), LongFormatInfo.INSTANCE));
- break;
- case MYSQL_METADATA_IS_DDL:
- case MYSQL_METADATA_EVENT_TYPE:
- break;
- default:
- throw new IllegalArgumentException(
- "Unsupported builtin field '" + builtInField + "' in debezium deserialization");
- }
- });
-
- return metadataFieldInfos.toArray(new FieldInfo[0]);
- }
-
- public static FieldInfo[] getProducedFieldInfos(FieldInfo[] physicalFieldInfos) {
- List<FieldInfo> results = new ArrayList<>();
- results.add(new FieldInfo(
- "metadata",
- new MapFormatInfo(StringFormatInfo.INSTANCE, StringFormatInfo.INSTANCE)));
- results.addAll(Arrays.asList(physicalFieldInfos));
- return results.toArray(new FieldInfo[0]);
- }
}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationFunction.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationFunction.java
index e9bec47..7c1414a 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationFunction.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationFunction.java
@@ -17,7 +17,6 @@
package org.apache.inlong.sort.singletenant.flink.deserialization;
-import java.util.HashMap;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.types.Row;
@@ -35,23 +34,12 @@ public class DeserializationFunction extends ProcessFunction<SerializedRecord, R
private final FieldMappingTransformer fieldMappingTransformer;
- private final boolean appendAttributes;
-
public DeserializationFunction(
DeserializationSchema<Row> deserializationSchema,
- FieldMappingTransformer fieldMappingTransformer,
- boolean appendAttributes
+ FieldMappingTransformer fieldMappingTransformer
) {
this.deserializationSchema = deserializationSchema;
this.fieldMappingTransformer = fieldMappingTransformer;
- this.appendAttributes = appendAttributes;
- }
-
- public DeserializationFunction(
- DeserializationSchema<Row> deserializationSchema,
- FieldMappingTransformer fieldMappingTransformer
- ) {
- this(deserializationSchema, fieldMappingTransformer, true);
}
@Override
@@ -76,9 +64,6 @@ public class DeserializationFunction extends ProcessFunction<SerializedRecord, R
}
deserializationSchema.deserialize(bodyBytes, new CallbackCollector<>(inputRow -> {
- if (appendAttributes) {
- inputRow = Row.join(Row.of(new HashMap<>()), inputRow);
- }
out.collect(fieldMappingTransformer.transform(inputRow, value.getTimestampMillis()));
}));
}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationSchemaFactory.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationSchemaFactory.java
index 9a3cd74..a085a1b 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationSchemaFactory.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationSchemaFactory.java
@@ -33,7 +33,6 @@ import org.apache.inlong.sort.protocol.deserialization.JsonDeserializationInfo;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
-import org.apache.inlong.sort.singletenant.flink.utils.CommonUtils;
import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.buildAvroRecordSchemaInJson;
import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertDateToStringFormatInfo;
@@ -46,19 +45,16 @@ public class DeserializationSchemaFactory {
FieldInfo[] fieldInfos,
DeserializationInfo deserializationInfo) throws IOException, ClassNotFoundException {
if (deserializationInfo instanceof JsonDeserializationInfo) {
- return buildJsonDeserializationSchema(CommonUtils.extractNonBuiltInFieldInfos(fieldInfos));
+ return buildJsonDeserializationSchema(fieldInfos);
} else if (deserializationInfo instanceof AvroDeserializationInfo) {
- return buildAvroDeserializationSchema(CommonUtils.extractNonBuiltInFieldInfos(fieldInfos));
+ return buildAvroDeserializationSchema(fieldInfos);
} else if (deserializationInfo instanceof CanalDeserializationInfo) {
- return CanalDeserializationSchemaBuilder.build(
- CommonUtils.extractNonBuiltInFieldInfos(fieldInfos),
- (CanalDeserializationInfo) deserializationInfo);
+ return CanalDeserializationSchemaBuilder.build(fieldInfos, (CanalDeserializationInfo) deserializationInfo);
} else if (deserializationInfo instanceof DebeziumDeserializationInfo) {
- return DebeziumDeserializationSchemaBuilder.build(
- fieldInfos,
+ return DebeziumDeserializationSchemaBuilder.build(fieldInfos,
(DebeziumDeserializationInfo) deserializationInfo);
} else {
- return buildStringDeserializationSchema(CommonUtils.extractNonBuiltInFieldInfos(fieldInfos));
+ return buildStringDeserializationSchema(fieldInfos);
}
}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/FieldMappingTransformer.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/FieldMappingTransformer.java
index 8fe46f9..1d20a20 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/FieldMappingTransformer.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/FieldMappingTransformer.java
@@ -20,12 +20,9 @@ package org.apache.inlong.sort.singletenant.flink.deserialization;
import org.apache.flink.types.Row;
import org.apache.inlong.sort.configuration.Configuration;
-import org.apache.inlong.sort.formats.common.BooleanFormatInfo;
import org.apache.inlong.sort.formats.common.FormatInfo;
-import org.apache.inlong.sort.formats.common.LongFormatInfo;
import org.apache.inlong.sort.formats.common.TimeFormatInfo;
import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
-import org.apache.inlong.sort.formats.json.MysqlBinLogData;
import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.util.DefaultValueStrategy;
@@ -34,7 +31,6 @@ import java.io.Serializable;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Date;
-import java.util.Map;
import static org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkNotNull;
@@ -45,7 +41,7 @@ public class FieldMappingTransformer implements Serializable {
/**
* Skips time and attribute fields of source record.
*/
- public static final int SOURCE_FIELD_SKIP_STEP = 1;
+ public static final int SOURCE_FIELD_SKIP_STEP = 0;
private final FieldInfo[] outputFieldInfos;
@@ -62,12 +58,11 @@ public class FieldMappingTransformer implements Serializable {
public Row transform(Row sourceRow, long dt) {
final Row outputRow = new Row(outputFieldInfos.length);
int sourceRowIndex = SOURCE_FIELD_SKIP_STEP;
- Map<String, String> attributes = (Map<String, String>) sourceRow.getField(0);
for (int i = 0; i < outputFieldInfos.length; i++) {
Object fieldValue = null;
if (outputFieldInfos[i] instanceof BuiltInFieldInfo) {
BuiltInFieldInfo builtInFieldInfo = (BuiltInFieldInfo) outputFieldInfos[i];
- fieldValue = transformBuiltInField(builtInFieldInfo, attributes, dt);
+ fieldValue = transformBuiltInField(builtInFieldInfo, dt);
} else if (sourceRowIndex < sourceRow.getArity()) {
fieldValue = sourceRow.getField(sourceRowIndex);
sourceRowIndex++;
@@ -81,25 +76,10 @@ public class FieldMappingTransformer implements Serializable {
return outputRow;
}
- private static Object transformBuiltInField(
- BuiltInFieldInfo builtInFieldInfo,
- Map<String, String> attributes,
- long dataTimestamp) {
- switch (builtInFieldInfo.getBuiltInField()) {
- case DATA_TIME:
- return inferDataTimeValue(builtInFieldInfo.getFormatInfo(), dataTimestamp);
- case MYSQL_METADATA_DATABASE:
- return attributes.get(MysqlBinLogData.MYSQL_METADATA_DATABASE);
- case MYSQL_METADATA_TABLE:
- return attributes.get(MysqlBinLogData.MYSQL_METADATA_TABLE);
- case MYSQL_METADATA_IS_DDL:
- return BooleanFormatInfo.INSTANCE.deserialize(attributes.get(MysqlBinLogData.MYSQL_METADATA_IS_DDL));
- case MYSQL_METADATA_EVENT_TIME:
- return LongFormatInfo.INSTANCE.deserialize(attributes.get(MysqlBinLogData.MYSQL_METADATA_EVENT_TIME));
- case MYSQL_METADATA_EVENT_TYPE:
- return attributes.get(MysqlBinLogData.MYSQL_METADATA_EVENT_TYPE);
+ private static Object transformBuiltInField(BuiltInFieldInfo builtInFieldInfo, long dataTimestamp) {
+ if (builtInFieldInfo.getBuiltInField() == BuiltInFieldInfo.BuiltInField.DATA_TIME) {
+ return inferDataTimeValue(builtInFieldInfo.getFormatInfo(), dataTimestamp);
}
-
return null;
}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/CanalSerializationSchemaBuilder.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/CanalSerializationSchemaBuilder.java
index ff9941a..e77c5b1 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/CanalSerializationSchemaBuilder.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/CanalSerializationSchemaBuilder.java
@@ -17,28 +17,22 @@
package org.apache.inlong.sort.singletenant.flink.serialization;
-import static org.apache.inlong.sort.singletenant.flink.serialization.SerializationSchemaFactory.MAP_NULL_KEY_LITERAL_DEFAULT;
-import static org.apache.inlong.sort.singletenant.flink.serialization.SerializationSchemaFactory.getMapNullKeyMode;
-import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertDateToStringFormatInfo;
-import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertFieldInfosToRowType;
-import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.createRowConverter;
-import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.extractFormatInfos;
-import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.getTimestampFormatStandard;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.formats.json.canal.CanalJsonSerializationSchema;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
-import org.apache.inlong.sort.formats.json.canal.CanalJsonDecodingFormat.ReadableMetadata;
-import org.apache.inlong.sort.formats.json.canal.CanalJsonSerializationSchema;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.serialization.CanalSerializationInfo;
-import org.apache.inlong.sort.singletenant.flink.utils.CommonUtils;
+
+import java.io.IOException;
+
+import static org.apache.inlong.sort.singletenant.flink.serialization.SerializationSchemaFactory.MAP_NULL_KEY_LITERAL_DEFAULT;
+import static org.apache.inlong.sort.singletenant.flink.serialization.SerializationSchemaFactory.getMapNullKeyMode;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertDateToStringFormatInfo;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertFieldInfosToRowType;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.extractFormatInfos;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.getTimestampFormatStandard;
public class CanalSerializationSchemaBuilder {
@@ -51,16 +45,10 @@ public class CanalSerializationSchemaBuilder {
mapNullKeyLiteral = MAP_NULL_KEY_LITERAL_DEFAULT;
}
- FieldInfo[] originPhysicalFieldInfos = CommonUtils.extractNonBuiltInFieldInfos(fieldInfos);
- FieldInfo[] convertedPhysicalFieldInfos = convertDateToStringFormatInfo(originPhysicalFieldInfos);
- RowType convertedPhysicalRowType = convertFieldInfosToRowType(convertedPhysicalFieldInfos);
FieldInfo[] convertedFieldInfos = convertDateToStringFormatInfo(fieldInfos);
-
+ RowType convertedRowType = convertFieldInfosToRowType(convertedFieldInfos);
CanalJsonSerializationSchema canalSchema = new CanalJsonSerializationSchema(
- convertedPhysicalRowType,
- getFieldIndexToMetadata(fieldInfos),
- createRowConverter(convertedFieldInfos),
- createRowConverter(convertedPhysicalFieldInfos),
+ convertedRowType,
getTimestampFormatStandard(canalSerializationInfo.getTimestampFormatStandard()),
getMapNullKeyMode(canalSerializationInfo.getMapNullKeyMod()),
mapNullKeyLiteral,
@@ -73,38 +61,4 @@ public class CanalSerializationSchemaBuilder {
return new CustomDateFormatSerializationSchemaWrapper(rowToRowDataSchema, extractFormatInfos(fieldInfos));
}
- private static Map<Integer, ReadableMetadata> getFieldIndexToMetadata(FieldInfo[] fieldInfos) {
- Map<Integer, ReadableMetadata> fieldIndexToMetadata = new HashMap<>();
-
- for (int i = 0; i < fieldInfos.length; i++) {
- FieldInfo fieldInfo = fieldInfos[i];
- if (fieldInfo instanceof BuiltInFieldInfo) {
- BuiltInFieldInfo builtInFieldInfo = (BuiltInFieldInfo) fieldInfo;
- BuiltInField builtInField = builtInFieldInfo.getBuiltInField();
- switch (builtInField) {
- case MYSQL_METADATA_DATABASE:
- fieldIndexToMetadata.put(i, ReadableMetadata.DATABASE);
- break;
- case MYSQL_METADATA_TABLE:
- fieldIndexToMetadata.put(i, ReadableMetadata.TABLE);
- break;
- case MYSQL_METADATA_EVENT_TIME:
- fieldIndexToMetadata.put(i, ReadableMetadata.EVENT_TIMESTAMP);
- break;
- case MYSQL_METADATA_IS_DDL:
- fieldIndexToMetadata.put(i, ReadableMetadata.IS_DDL);
- break;
- case MYSQL_METADATA_EVENT_TYPE:
- // We will always append `type` to the result
- break;
- default:
- throw new IllegalArgumentException(
- "Unsupported builtin field '" + builtInField + "' in debezium deserialization");
- }
- }
- }
-
- return fieldIndexToMetadata;
- }
-
}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtils.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtils.java
index 92c0c13..a5636ed 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtils.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtils.java
@@ -18,7 +18,6 @@
package org.apache.inlong.sort.singletenant.flink.utils;
-import java.util.Arrays;
import org.apache.avro.Schema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.common.TimestampFormat;
@@ -36,7 +35,6 @@ import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.apache.inlong.sort.formats.common.TimeFormatInfo;
import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
import org.apache.inlong.sort.formats.common.TypeInfo;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import java.io.ByteArrayInputStream;
@@ -195,10 +193,4 @@ public class CommonUtils {
return output;
}
- public static FieldInfo[] extractNonBuiltInFieldInfos(FieldInfo[] fieldInfos) {
- return Arrays.stream(fieldInfos)
- .filter(fieldInfo -> !(fieldInfo instanceof BuiltInFieldInfo))
- .toArray(FieldInfo[]::new);
- }
-
}
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/DebeziumToCanalITCase.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/DebeziumToCanalITCase.java
deleted file mode 100644
index b1e994c..0000000
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/DebeziumToCanalITCase.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.singletenant.flink;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.types.Row;
-import org.apache.inlong.common.msg.InLongMsg;
-import org.apache.inlong.sort.configuration.Configuration;
-import org.apache.inlong.sort.formats.common.BooleanFormatInfo;
-import org.apache.inlong.sort.formats.common.IntFormatInfo;
-import org.apache.inlong.sort.formats.common.LongFormatInfo;
-import org.apache.inlong.sort.formats.common.StringFormatInfo;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField;
-import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.deserialization.DebeziumDeserializationInfo;
-import org.apache.inlong.sort.protocol.serialization.CanalSerializationInfo;
-import org.apache.inlong.sort.singletenant.flink.deserialization.DeserializationFunction;
-import org.apache.inlong.sort.singletenant.flink.deserialization.DeserializationSchemaFactory;
-import org.apache.inlong.sort.singletenant.flink.deserialization.FieldMappingTransformer;
-import org.apache.inlong.sort.singletenant.flink.serialization.SerializationSchemaFactory;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DebeziumToCanalITCase {
-
- private static final Logger logger = LoggerFactory.getLogger(DebeziumToCanalITCase.class);
-
- private static final CountDownLatch verificationFinishedLatch = new CountDownLatch(1);
-
- private static final CountDownLatch jobFinishedLatch = new CountDownLatch(1);
-
- private final FieldInfo[] fieldInfos = new FieldInfo[]{
- new FieldInfo("name", StringFormatInfo.INSTANCE),
- new FieldInfo("age", IntFormatInfo.INSTANCE),
- new BuiltInFieldInfo("db", StringFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_DATABASE),
- new BuiltInFieldInfo("table", StringFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_TABLE),
- new BuiltInFieldInfo("es", LongFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_EVENT_TIME),
- new BuiltInFieldInfo("isDdl", BooleanFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_IS_DDL),
- new BuiltInFieldInfo("type", StringFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_EVENT_TYPE)
- };
-
- private static final String expectedResult =
- "{\"data\":[{\"name\":\"testName\",\"age\":29}],"
- + "\"type\":\"INSERT\",\"database\":\"test\",\"table\":\"test\","
- + "\"es\":1644896917208,\"isDdl\":false}";
-
- @Test(timeout = 60 * 1000)
- public void test() throws Exception {
-
- final ExecutorService executorService = Executors.newSingleThreadExecutor();
-
- executorService.execute(() -> {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- try {
- DataStream<SerializedRecord> sourceStream = env.addSource(new TestSource());
-
- // Deserialize
- DeserializationSchema<Row> deserializationSchema = DeserializationSchemaFactory.build(
- fieldInfos,
- new DebeziumDeserializationInfo(false, "ISO_8601"));
- FieldMappingTransformer fieldMappingTransformer = new FieldMappingTransformer(
- new Configuration(), fieldInfos);
- DeserializationFunction function = new DeserializationFunction(
- deserializationSchema, fieldMappingTransformer, false);
- DataStream<Row> deserializedStream = sourceStream.process(function);
-
- // Serialize and output
- SerializationSchema<Row> serializationSchema = SerializationSchemaFactory.build(
- fieldInfos,
- new CanalSerializationInfo("Sql", "Literal", null, false)
- );
- deserializedStream.addSink(new TestSink(serializationSchema));
-
- env.execute();
-
- } catch (Exception e) {
- logger.error("Error occurred when executing flink test job: ", e);
- } finally {
- jobFinishedLatch.countDown();
- }
- });
-
- try {
- while (!verify()) {
- Thread.sleep(500);
- }
- verificationFinishedLatch.countDown();
- jobFinishedLatch.await();
- } finally {
- executorService.shutdown();
- }
-
- Thread.sleep(10000);
- }
-
- private boolean verify() {
- if (TestSink.results.size() == 0) {
- return false;
- } else {
- assertEquals(expectedResult, TestSink.results.get(0));
- return true;
- }
- }
-
- private static class TestSource extends RichSourceFunction<SerializedRecord> {
-
- String testString = "{\n"
- + " \"before\":null,\n"
- + " \"after\":{\n"
- + " \"name\":\"testName\",\n"
- + " \"age\":29\n"
- + " },\n"
- + " \"source\":{\n"
- + " \"version\":\"1.4.2.Final\",\n"
- + " \"connector\":\"mysql\",\n"
- + " \"name\":\"my_server_01\",\n"
- + " \"ts_ms\":1644896917000,\n"
- + " \"snapshot\":\"false\",\n"
- + " \"db\":\"test\",\n"
- + " \"table\":\"test\",\n"
- + " \"server_id\":1,\n"
- + " \"gtid\":null,\n"
- + " \"file\":\"mysql-bin.000067\",\n"
- + " \"pos\":944,\n"
- + " \"row\":0,\n"
- + " \"thread\":13,\n"
- + " \"query\":null\n"
- + " },\n"
- + " \"op\":\"c\",\n"
- + " \"ts_ms\":1644896917208,\n"
- + " \"transaction\":null\n"
- + "}";
-
- @Override
- public void open(org.apache.flink.configuration.Configuration configuration) {
- }
-
- @Override
- public void run(SourceContext<SerializedRecord> sourceContext) throws Exception {
- InLongMsg inLongMsg = InLongMsg.newInLongMsg(true);
- String attrs = "m=0"
- + "&dt=" + System.currentTimeMillis()
- + "&iname=" + "tid";
- inLongMsg.addMsg(attrs, testString.getBytes());
- byte[] bytes = inLongMsg.buildArray();
-
- sourceContext.collect(new SerializedRecord(System.currentTimeMillis(), bytes));
-
- verificationFinishedLatch.await();
- }
-
- @Override
- public void cancel() {
-
- }
- }
-
- private static class TestSink extends RichSinkFunction<Row> {
-
- private static final List<String> results = new ArrayList<>();
-
- private final SerializationSchema<Row> serializationSchema;
-
- public TestSink(SerializationSchema<Row> serializationSchema) {
- this.serializationSchema = serializationSchema;
- }
-
- @Override
- public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
- serializationSchema.open(null);
- }
-
- @Override
- public void invoke(Row record, Context context) {
- results.add(new String(serializationSchema.serialize(record)));
- }
- }
-}
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/DebeziumDeserializationTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/DebeziumDeserializationTest.java
index 1e90ccb..04de89b 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/DebeziumDeserializationTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/DebeziumDeserializationTest.java
@@ -28,9 +28,6 @@ import org.apache.inlong.sort.formats.common.MapFormatInfo;
import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.apache.inlong.sort.formats.common.TimeFormatInfo;
import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
-import org.apache.inlong.sort.formats.json.MysqlBinLogData;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.deserialization.DebeziumDeserializationInfo;
import org.junit.Test;
@@ -57,42 +54,30 @@ public class DebeziumDeserializationTest {
new FieldInfo("map",
new MapFormatInfo(StringFormatInfo.INSTANCE, LongFormatInfo.INSTANCE)),
new FieldInfo("map2map", new MapFormatInfo(StringFormatInfo.INSTANCE,
- new MapFormatInfo(StringFormatInfo.INSTANCE, IntFormatInfo.INSTANCE))),
- new BuiltInFieldInfo("db", StringFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_DATABASE),
- new BuiltInFieldInfo("table", StringFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_TABLE),
- new BuiltInFieldInfo("es", LongFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_EVENT_TIME)
+ new MapFormatInfo(StringFormatInfo.INSTANCE, IntFormatInfo.INSTANCE)))
};
private Row generateTestRow() {
- Row testRow = new Row(9);
- testRow.setField(0, new HashMap<String, String>() {
- {
- put(MysqlBinLogData.MYSQL_METADATA_IS_DDL, "false");
- put(MysqlBinLogData.MYSQL_METADATA_EVENT_TYPE, "c");
- put(MysqlBinLogData.MYSQL_METADATA_DATABASE, "test");
- put(MysqlBinLogData.MYSQL_METADATA_TABLE, "test");
- put(MysqlBinLogData.MYSQL_METADATA_EVENT_TIME, "1644896917208");
- }
- });
- testRow.setField(1, 1238123899121L);
- testRow.setField(2, "testName");
+ Row testRow = new Row(8);
+ testRow.setField(0, 1238123899121L);
+ testRow.setField(1, "testName");
byte[] bytes = new byte[]{1, 2, 3, 4, 5, 6};
- testRow.setField(3, bytes);
+ testRow.setField(2, bytes);
- testRow.setField(4, Date.valueOf("1990-10-14"));
- testRow.setField(5, Time.valueOf("12:12:43"));
- testRow.setField(6, Timestamp.valueOf("1990-10-14 12:12:43"));
+ testRow.setField(3, Date.valueOf("1990-10-14"));
+ testRow.setField(4, Time.valueOf("12:12:43"));
+ testRow.setField(5, Timestamp.valueOf("1990-10-14 12:12:43"));
Map<String, Long> map = new HashMap<>();
map.put("flink", 123L);
- testRow.setField(7, map);
+ testRow.setField(6, map);
Map<String, Map<String, Integer>> nestedMap = new HashMap<>();
Map<String, Integer> innerMap = new HashMap<>();
innerMap.put("key", 234);
nestedMap.put("inner_map", innerMap);
- testRow.setField(8, nestedMap);
+ testRow.setField(7, nestedMap);
return testRow;
}
@@ -100,48 +85,48 @@ public class DebeziumDeserializationTest {
@Test
public void testDebeziumDeserializationSchema() throws IOException, ClassNotFoundException {
String testString = "{\n"
- + " \"before\":null,\n"
- + " \"after\":{\n"
- + " \"id\":1238123899121,\n"
- + " \"name\":\"testName\",\n"
- + " \"bytes\":\"AQIDBAUG\",\n"
- + " \"date\":\"1990-10-14\",\n"
- + " \"time\":\"12:12:43\",\n"
- + " \"timestamp\":\"1990-10-14 12:12:43\",\n"
- + " \"map\":{\n"
- + " \"flink\":123\n"
- + " },\n"
- + " \"map2map\":{\n"
- + " \"inner_map\":{\n"
- + " \"key\":234\n"
- + " }\n"
- + " }\n"
- + " },\n"
- + " \"source\":{\n"
- + " \"version\":\"1.4.2.Final\",\n"
- + " \"connector\":\"mysql\",\n"
- + " \"name\":\"my_server_01\",\n"
- + " \"ts_ms\":1644896917000,\n"
- + " \"snapshot\":\"false\",\n"
- + " \"db\":\"test\",\n"
- + " \"table\":\"test\",\n"
- + " \"server_id\":1,\n"
- + " \"gtid\":null,\n"
- + " \"file\":\"mysql-bin.000067\",\n"
- + " \"pos\":944,\n"
- + " \"row\":0,\n"
- + " \"thread\":13,\n"
- + " \"query\":null\n"
- + " },\n"
- + " \"op\":\"c\",\n"
- + " \"ts_ms\":1644896917208,\n"
- + " \"transaction\":null\n"
- + "}";
+ + " \"before\":null,\n"
+ + " \"after\":{\n"
+ + " \"id\":1238123899121,\n"
+ + " \"name\":\"testName\",\n"
+ + " \"bytes\":\"AQIDBAUG\",\n"
+ + " \"date\":\"1990-10-14\",\n"
+ + " \"time\":\"12:12:43\",\n"
+ + " \"timestamp\":\"1990-10-14 12:12:43\",\n"
+ + " \"map\":{\n"
+ + " \"flink\":123\n"
+ + " },\n"
+ + " \"map2map\":{\n"
+ + " \"inner_map\":{\n"
+ + " \"key\":234\n"
+ + " }\n"
+ + " }\n"
+ + " },\n"
+ + " \"source\":{\n"
+ + " \"version\":\"1.4.2.Final\",\n"
+ + " \"connector\":\"mysql\",\n"
+ + " \"name\":\"my_server_01\",\n"
+ + " \"ts_ms\":1644896917000,\n"
+ + " \"snapshot\":\"false\",\n"
+ + " \"db\":\"test\",\n"
+ + " \"table\":\"test\",\n"
+ + " \"server_id\":1,\n"
+ + " \"gtid\":null,\n"
+ + " \"file\":\"mysql-bin.000067\",\n"
+ + " \"pos\":944,\n"
+ + " \"row\":0,\n"
+ + " \"thread\":13,\n"
+ + " \"query\":null\n"
+ + " },\n"
+ + " \"op\":\"c\",\n"
+ + " \"ts_ms\":1644896917208,\n"
+ + " \"transaction\":null\n"
+ + "}";
byte[] testBytes = testString.getBytes(StandardCharsets.UTF_8);
DeserializationSchema<Row> schemaWithoutFilter = DeserializationSchemaFactory.build(
fieldInfos,
- new DebeziumDeserializationInfo(false, "ISO_8601")
+ new DebeziumDeserializationInfo(true, "ISO_8601")
);
ListCollector<Row> collector = new ListCollector<>();
schemaWithoutFilter.deserialize(testBytes, collector);
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/FieldMappingTransformerTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/FieldMappingTransformerTest.java
index ec8ac03..d51421d 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/FieldMappingTransformerTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/FieldMappingTransformerTest.java
@@ -17,7 +17,6 @@
package org.apache.inlong.sort.singletenant.flink.deserialization;
-import java.util.HashMap;
import org.apache.flink.types.Row;
import org.apache.inlong.sort.configuration.Configuration;
import org.apache.inlong.sort.formats.common.IntFormatInfo;
@@ -54,7 +53,7 @@ public class FieldMappingTransformerTest {
configuration.setBoolean(SINK_FIELD_TYPE_SHORT_NULLABLE, false);
configuration.setBoolean(SINK_FIELD_TYPE_LONG_NULLABLE, false);
FieldMappingTransformer transformer = new FieldMappingTransformer(configuration, fieldInfos);
- Row resultRow = transformer.transform(Row.of(new HashMap<>(), 1), ms);
+ Row resultRow = transformer.transform(Row.of(1), ms);
assertEquals(6, resultRow.getArity());
assertEquals(new Timestamp(ms), resultRow.getField(0));
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/serialization/CanalSerializationTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/serialization/CanalSerializationTest.java
deleted file mode 100644
index 82ba75d..0000000
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/serialization/CanalSerializationTest.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.singletenant.flink.serialization;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.UserCodeClassLoader;
-import org.apache.inlong.sort.formats.common.BooleanFormatInfo;
-import org.apache.inlong.sort.formats.common.IntFormatInfo;
-import org.apache.inlong.sort.formats.common.LongFormatInfo;
-import org.apache.inlong.sort.formats.common.StringFormatInfo;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField;
-import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.serialization.CanalSerializationInfo;
-import org.junit.Test;
-
-public class CanalSerializationTest {
-
- private final FieldInfo[] fieldInfos = new FieldInfo[]{
- new FieldInfo("name", StringFormatInfo.INSTANCE),
- new FieldInfo("age", IntFormatInfo.INSTANCE),
- new BuiltInFieldInfo("db", StringFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_DATABASE),
- new BuiltInFieldInfo("table", StringFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_TABLE),
- new BuiltInFieldInfo("es", LongFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_EVENT_TIME),
- new BuiltInFieldInfo("isDdl", BooleanFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_IS_DDL),
- new BuiltInFieldInfo("type", StringFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_EVENT_TYPE)
- };
-
- @Test
- public void test() throws Exception {
- SerializationSchema<Row> canalJsonSerializationSchema = SerializationSchemaFactory.build(
- fieldInfos,
- new CanalSerializationInfo("Sql", "Literal", null, false)
- );
- canalJsonSerializationSchema.open(new InitializationContext() {
- @Override
- public MetricGroup getMetricGroup() {
- return null;
- }
-
- @Override
- public UserCodeClassLoader getUserCodeClassLoader() {
- return null;
- }
- });
-
- Row row = Row.of(
- "name",
- 29,
- "database",
- "table",
- 123L,
- false,
- "INSERT");
- String result = new String(canalJsonSerializationSchema.serialize(row));
-
- String expectedResult =
- "{\"data\":[{\"name\":\"name\",\"age\":29}],"
- + "\"type\":\"INSERT\",\"database\":\"database\","
- + "\"table\":\"table\",\"es\":123,\"isDdl\":false}";
- assertEquals(expectedResult, result);
- }
-}