You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/04/29 11:01:45 UTC
[incubator-inlong] branch master updated: [INLONG-4013][Sort] Support writing metadata in canal format (#4023)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 1bae36874 [INLONG-4013][Sort] Support writing metadata in canal format (#4023)
1bae36874 is described below
commit 1bae36874f35b428d516334ced82e7823bab0d89
Author: thexia <37...@users.noreply.github.com>
AuthorDate: Fri Apr 29 19:01:39 2022 +0800
[INLONG-4013][Sort] Support writing metadata in canal format (#4023)
* Support writing metadata in canal format
* fix code style
* fix license problem
* fix ut error
* fix the scala version to a variable
---
inlong-sort/pom.xml | 6 +
inlong-sort/sort-connectors/pom.xml | 10 +
.../kafka/DynamicKafkaSerializationSchema.java | 205 +++++++++
.../inlong/sort/flink/kafka/KafkaDynamicSink.java | 471 +++++++++++++++++++++
.../sort/flink/kafka/KafkaDynamicTableFactory.java | 372 ++++++++++++++++
.../inlong/sort}/flink/kafka/KafkaSinkBuilder.java | 2 +-
.../org.apache.flink.table.factories.Factory | 16 +
.../canal/CanalJsonEnhancedDecodingFormat.java | 329 ++++++++++++++
.../CanalJsonEnhancedDeserializationSchema.java | 404 ++++++++++++++++++
.../canal/CanalJsonEnhancedEncodingFormat.java | 280 ++++++++++++
.../json/canal/CanalJsonEnhancedFormatFactory.java | 113 +++++
.../CanalJsonEnhancedSerializationSchema.java | 191 +++++++++
.../org.apache.flink.table.factories.Factory | 16 +
.../canal/CanalJsonEnhancedFormatFactoryTest.java | 138 ++++++
.../canal/CanalJsonEnhancedSerDeSchemaTest.java | 214 ++++++++++
.../src/test/resources/canal-json-inlong-data.txt | 3 +
.../src/test/resources/log4j-test.properties | 29 ++
inlong-sort/sort-formats/pom.xml | 14 +
inlong-sort/sort-single-tenant/pom.xml | 5 -
.../inlong/sort/singletenant/flink/Entrance.java | 2 +-
.../flink/parser/impl/FlinkSqlParser.java | 34 +-
.../flink/kafka/KafkaSinkTestBase.java | 4 +-
.../flink/kafka/RowToAvroKafkaSinkTest.java | 2 +-
.../flink/kafka/RowToCanalKafkaSinkTest.java | 2 +-
.../kafka/RowToDebeziumJsonKafkaSinkTest.java | 2 +-
.../flink/kafka/RowToJsonKafkaSinkTest.java | 2 +-
.../flink/kafka/RowToStringKafkaSinkTest.java | 2 +-
.../flink/parser/FlinkSqlParserTest.java | 1 -
pom.xml | 6 +
29 files changed, 2842 insertions(+), 33 deletions(-)
diff --git a/inlong-sort/pom.xml b/inlong-sort/pom.xml
index 5844c4bc4..3d1f326e6 100644
--- a/inlong-sort/pom.xml
+++ b/inlong-sort/pom.xml
@@ -75,6 +75,12 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils-junit</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/inlong-sort/sort-connectors/pom.xml b/inlong-sort/sort-connectors/pom.xml
index c6d213ba8..f4713775c 100644
--- a/inlong-sort/sort-connectors/pom.xml
+++ b/inlong-sort/sort-connectors/pom.xml
@@ -59,6 +59,11 @@
<artifactId>flink-streaming-java_${flink.scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java-bridge_${flink.scala.binary.version}</artifactId>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
@@ -98,6 +103,11 @@
<groupId>org.antlr</groupId>
<artifactId>ST4</artifactId>
</dependency>
+ <!--for kafka-->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-kafka_${flink.scala.binary.version}</artifactId>
+ </dependency>
<!--for pulsar-->
<dependency>
<groupId>org.apache.pulsar</groupId>
diff --git a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/kafka/DynamicKafkaSerializationSchema.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/kafka/DynamicKafkaSerializationSchema.java
new file mode 100644
index 000000000..be3f26487
--- /dev/null
+++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/kafka/DynamicKafkaSerializationSchema.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.kafka;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.KafkaContextAware;
+import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.flink.kafka.KafkaDynamicSink.WritableMetadata;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import javax.annotation.Nullable;
+import java.io.Serializable;
+
+/**
+ * A specific {@link KafkaSerializationSchema} for {@link KafkaDynamicSink}.
+ */
+class DynamicKafkaSerializationSchema
+ implements KafkaSerializationSchema<RowData>, KafkaContextAware<RowData> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final @Nullable FlinkKafkaPartitioner<RowData> partitioner;
+
+ private final String topic;
+
+ private final @Nullable SerializationSchema<RowData> keySerialization;
+
+ private final SerializationSchema<RowData> valueSerialization;
+
+ private final RowData.FieldGetter[] keyFieldGetters;
+
+ private final RowData.FieldGetter[] valueFieldGetters;
+
+ private final boolean hasMetadata;
+
+ private final boolean upsertMode;
+
+ /**
+ * Contains the position for each value of {@link WritableMetadata} in the
+ * consumed row or -1 if this metadata key is not used.
+ */
+ private final int[] metadataPositions;
+
+ private int[] partitions;
+
+ private int parallelInstanceId;
+
+ private int numParallelInstances;
+
+ DynamicKafkaSerializationSchema(
+ String topic,
+ @Nullable FlinkKafkaPartitioner<RowData> partitioner,
+ @Nullable SerializationSchema<RowData> keySerialization,
+ SerializationSchema<RowData> valueSerialization,
+ RowData.FieldGetter[] keyFieldGetters,
+ RowData.FieldGetter[] valueFieldGetters,
+ boolean hasMetadata,
+ int[] metadataPositions,
+ boolean upsertMode) {
+ if (upsertMode) {
+ Preconditions.checkArgument(
+ keySerialization != null && keyFieldGetters.length > 0,
+ "Key must be set in upsert mode for serialization schema.");
+ }
+ this.topic = topic;
+ this.partitioner = partitioner;
+ this.keySerialization = keySerialization;
+ this.valueSerialization = valueSerialization;
+ this.keyFieldGetters = keyFieldGetters;
+ this.valueFieldGetters = valueFieldGetters;
+ this.hasMetadata = hasMetadata;
+ this.metadataPositions = metadataPositions;
+ this.upsertMode = upsertMode;
+ }
+
+ @Override
+ public void open(SerializationSchema.InitializationContext context) throws Exception {
+ if (keySerialization != null) {
+ keySerialization.open(context);
+ }
+ valueSerialization.open(context);
+ if (partitioner != null) {
+ partitioner.open(parallelInstanceId, numParallelInstances);
+ }
+ }
+
+ @Override
+ public ProducerRecord<byte[], byte[]> serialize(RowData consumedRow, @Nullable Long timestamp) {
+ // shortcut in case no input projection is required
+ if (keySerialization == null && !hasMetadata) {
+ final byte[] valueSerialized = valueSerialization.serialize(consumedRow);
+ return new ProducerRecord<>(
+ topic,
+ extractPartition(consumedRow, null, valueSerialized),
+ null,
+ valueSerialized);
+ }
+
+ final byte[] keySerialized;
+ if (keySerialization == null) {
+ keySerialized = null;
+ } else {
+ final RowData keyRow = createProjectedRow(consumedRow, RowKind.INSERT, keyFieldGetters);
+ keySerialized = keySerialization.serialize(keyRow);
+ }
+
+ final byte[] valueSerialized;
+ final RowKind kind = consumedRow.getRowKind();
+ final RowData valueRow = createProjectedRow(consumedRow, kind, valueFieldGetters);
+ if (upsertMode) {
+ if (kind == RowKind.DELETE || kind == RowKind.UPDATE_BEFORE) {
+ // transform the message as the tombstone message
+ valueSerialized = null;
+ } else {
+ // make the message to be INSERT to be compliant with the INSERT-ONLY format
+ valueRow.setRowKind(RowKind.INSERT);
+ valueSerialized = valueSerialization.serialize(valueRow);
+ }
+ } else {
+ valueSerialized = valueSerialization.serialize(valueRow);
+ }
+
+ return new ProducerRecord<>(
+ topic,
+ extractPartition(consumedRow, keySerialized, valueSerialized),
+ readMetadata(consumedRow, WritableMetadata.TIMESTAMP),
+ keySerialized,
+ valueSerialized,
+ readMetadata(consumedRow, WritableMetadata.HEADERS));
+ }
+
+ @Override
+ public void setParallelInstanceId(int parallelInstanceId) {
+ this.parallelInstanceId = parallelInstanceId;
+ }
+
+ @Override
+ public void setNumParallelInstances(int numParallelInstances) {
+ this.numParallelInstances = numParallelInstances;
+ }
+
+ @Override
+ public void setPartitions(int[] partitions) {
+ this.partitions = partitions;
+ }
+
+ @Override
+ public String getTargetTopic(RowData element) {
+ return topic;
+ }
+
+ @SuppressWarnings("unchecked")
+ private <T> T readMetadata(RowData consumedRow, WritableMetadata metadata) {
+ final int pos = metadataPositions[metadata.ordinal()];
+ if (pos < 0) {
+ return null;
+ }
+ return (T) metadata.converter.read(consumedRow, pos);
+ }
+
+ private Integer extractPartition(
+ RowData consumedRow, @Nullable byte[] keySerialized, byte[] valueSerialized) {
+ if (partitioner != null) {
+ return partitioner.partition(
+ consumedRow, keySerialized, valueSerialized, topic, partitions);
+ }
+ return null;
+ }
+
+ static RowData createProjectedRow(
+ RowData consumedRow, RowKind kind, RowData.FieldGetter[] fieldGetters) {
+ final int arity = fieldGetters.length;
+ final GenericRowData genericRowData = new GenericRowData(kind, arity);
+ for (int fieldPos = 0; fieldPos < arity; fieldPos++) {
+ genericRowData.setField(fieldPos, fieldGetters[fieldPos].getFieldOrNull(consumedRow));
+ }
+ return genericRowData;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ interface MetadataConverter extends Serializable {
+ Object read(RowData consumedRow, int pos);
+ }
+}
diff --git a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/kafka/KafkaDynamicSink.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/kafka/KafkaDynamicSink.java
new file mode 100644
index 000000000..cf2980b30
--- /dev/null
+++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/kafka/KafkaDynamicSink.java
@@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.kafka;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.table.BufferedUpsertSinkFunction;
+import org.apache.flink.streaming.connectors.kafka.table.KafkaSinkSemantic;
+import org.apache.flink.streaming.connectors.kafka.table.SinkBufferFlushMode;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+
+import org.apache.inlong.sort.flink.kafka.DynamicKafkaSerializationSchema.MetadataConverter;
+import org.apache.kafka.common.header.Header;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A version-agnostic Kafka {@link DynamicTableSink}. */
+@Internal
+public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetadata {
+
+ // --------------------------------------------------------------------------------------------
+ // Mutable attributes
+ // --------------------------------------------------------------------------------------------
+
+ /** Metadata that is appended at the end of a physical sink row. */
+ protected List<String> metadataKeys;
+
+ // --------------------------------------------------------------------------------------------
+ // Format attributes
+ // --------------------------------------------------------------------------------------------
+ private static final String VALUE_METADATA_PREFIX = "value.";
+
+ /** Data type of consumed data type. */
+ protected DataType consumedDataType;
+
+ /** Data type to configure the formats. */
+ protected final DataType physicalDataType;
+
+ /** Optional format for encoding keys to Kafka. */
+ protected final @Nullable EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat;
+
+ /** Format for encoding values to Kafka. */
+ protected final EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat;
+
+ /** Indices that determine the key fields and the source position in the consumed row. */
+ protected final int[] keyProjection;
+
+ /** Indices that determine the value fields and the source position in the consumed row. */
+ protected final int[] valueProjection;
+
+ /** Prefix that needs to be removed from fields when constructing the physical data type. */
+ protected final @Nullable String keyPrefix;
+
+ // --------------------------------------------------------------------------------------------
+ // Kafka-specific attributes
+ // --------------------------------------------------------------------------------------------
+
+ /** The Kafka topic to write to. */
+ protected final String topic;
+
+ /** Properties for the Kafka producer. */
+ protected final Properties properties;
+
+ /** Partitioner to select Kafka partition for each item. */
+ protected final @Nullable FlinkKafkaPartitioner<RowData> partitioner;
+
+ /** Sink commit semantic. */
+ protected final KafkaSinkSemantic semantic;
+
+ /**
+ * Flag to determine sink mode. In upsert mode sink transforms the delete/update-before message
+ * to tombstone message.
+ */
+ protected final boolean upsertMode;
+
+ /** Sink buffer flush config which only supported in upsert mode now. */
+ protected final SinkBufferFlushMode flushMode;
+
+ /** Parallelism of the physical Kafka producer. * */
+ protected final @Nullable Integer parallelism;
+
+ public KafkaDynamicSink(
+ DataType consumedDataType,
+ DataType physicalDataType,
+ @Nullable EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat,
+ EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat,
+ int[] keyProjection,
+ int[] valueProjection,
+ @Nullable String keyPrefix,
+ String topic,
+ Properties properties,
+ @Nullable FlinkKafkaPartitioner<RowData> partitioner,
+ KafkaSinkSemantic semantic,
+ boolean upsertMode,
+ SinkBufferFlushMode flushMode,
+ @Nullable Integer parallelism) {
+ // Format attributes
+ this.consumedDataType =
+ checkNotNull(consumedDataType, "Consumed data type must not be null.");
+ this.physicalDataType =
+ checkNotNull(physicalDataType, "Physical data type must not be null.");
+ this.keyEncodingFormat = keyEncodingFormat;
+ this.valueEncodingFormat =
+ checkNotNull(valueEncodingFormat, "Value encoding format must not be null.");
+ this.keyProjection = checkNotNull(keyProjection, "Key projection must not be null.");
+ this.valueProjection = checkNotNull(valueProjection, "Value projection must not be null.");
+ this.keyPrefix = keyPrefix;
+ // Mutable attributes
+ this.metadataKeys = Collections.emptyList();
+ // Kafka-specific attributes
+ this.topic = checkNotNull(topic, "Topic must not be null.");
+ this.properties = checkNotNull(properties, "Properties must not be null.");
+ this.partitioner = partitioner;
+ this.semantic = checkNotNull(semantic, "Semantic must not be null.");
+ this.upsertMode = upsertMode;
+ this.flushMode = checkNotNull(flushMode);
+ if (flushMode.isEnabled() && !upsertMode) {
+ throw new IllegalArgumentException(
+ "Sink buffer flush is only supported in upsert-kafka.");
+ }
+ this.parallelism = parallelism;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+ return valueEncodingFormat.getChangelogMode();
+ }
+
+ @Override
+ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+ final SerializationSchema<RowData> keySerialization =
+ createSerialization(context, keyEncodingFormat, keyProjection, keyPrefix);
+
+ final SerializationSchema<RowData> valueSerialization =
+ createSerialization(context, valueEncodingFormat, valueProjection, null);
+
+ final FlinkKafkaProducer<RowData> kafkaProducer =
+ createKafkaProducer(keySerialization, valueSerialization);
+
+ if (flushMode.isEnabled() && upsertMode) {
+ BufferedUpsertSinkFunction buffedSinkFunction =
+ new BufferedUpsertSinkFunction(
+ kafkaProducer,
+ physicalDataType,
+ keyProjection,
+ context.createTypeInformation(consumedDataType),
+ flushMode);
+ return SinkFunctionProvider.of(buffedSinkFunction, parallelism);
+ } else {
+ return SinkFunctionProvider.of(kafkaProducer, parallelism);
+ }
+ }
+
+ @Override
+ public Map<String, DataType> listWritableMetadata() {
+ final Map<String, DataType> metadataMap = new LinkedHashMap<>();
+
+ // according to convention, the order of the final row must be
+ // PHYSICAL + FORMAT METADATA + CONNECTOR METADATA
+ // where the format metadata has highest precedence
+
+ // add value format metadata with prefix
+ valueEncodingFormat
+ .listWritableMetadata()
+ .forEach((key, value) -> metadataMap.put(VALUE_METADATA_PREFIX + key, value));
+
+ // add connector metadata
+ Stream.of(WritableMetadata.values())
+ .forEachOrdered(m -> metadataMap.put(m.key, m.dataType));
+ return metadataMap;
+ }
+
+ @Override
+ public void applyWritableMetadata(List<String> metadataKeys, DataType consumedDataType) {
+ // separate connector and format metadata
+ final List<String> formatMetadataKeys =
+ metadataKeys.stream()
+ .filter(k -> k.startsWith(VALUE_METADATA_PREFIX))
+ .collect(Collectors.toList());
+ final List<String> connectorMetadataKeys = new ArrayList<>(metadataKeys);
+ connectorMetadataKeys.removeAll(formatMetadataKeys);
+
+ // push down format metadata
+ final Map<String, DataType> formatMetadata = valueEncodingFormat.listWritableMetadata();
+ if (formatMetadata.size() > 0) {
+ final List<String> requestedFormatMetadataKeys =
+ formatMetadataKeys.stream()
+ .map(k -> k.substring(VALUE_METADATA_PREFIX.length()))
+ .collect(Collectors.toList());
+ valueEncodingFormat.applyWritableMetadata(requestedFormatMetadataKeys);
+ }
+
+ this.metadataKeys = connectorMetadataKeys;
+ this.consumedDataType = consumedDataType;
+ }
+
+ @Override
+ public DynamicTableSink copy() {
+ final KafkaDynamicSink copy =
+ new KafkaDynamicSink(
+ consumedDataType,
+ physicalDataType,
+ keyEncodingFormat,
+ valueEncodingFormat,
+ keyProjection,
+ valueProjection,
+ keyPrefix,
+ topic,
+ properties,
+ partitioner,
+ semantic,
+ upsertMode,
+ flushMode,
+ parallelism);
+ copy.metadataKeys = metadataKeys;
+ return copy;
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "Kafka table sink";
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final KafkaDynamicSink that = (KafkaDynamicSink) o;
+ return Objects.equals(metadataKeys, that.metadataKeys)
+ && Objects.equals(consumedDataType, that.consumedDataType)
+ && Objects.equals(physicalDataType, that.physicalDataType)
+ && Objects.equals(keyEncodingFormat, that.keyEncodingFormat)
+ && Objects.equals(valueEncodingFormat, that.valueEncodingFormat)
+ && Arrays.equals(keyProjection, that.keyProjection)
+ && Arrays.equals(valueProjection, that.valueProjection)
+ && Objects.equals(keyPrefix, that.keyPrefix)
+ && Objects.equals(topic, that.topic)
+ && Objects.equals(properties, that.properties)
+ && Objects.equals(partitioner, that.partitioner)
+ && Objects.equals(semantic, that.semantic)
+ && Objects.equals(upsertMode, that.upsertMode)
+ && Objects.equals(flushMode, that.flushMode)
+ && Objects.equals(parallelism, that.parallelism);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ metadataKeys,
+ consumedDataType,
+ physicalDataType,
+ keyEncodingFormat,
+ valueEncodingFormat,
+ keyProjection,
+ valueProjection,
+ keyPrefix,
+ topic,
+ properties,
+ partitioner,
+ semantic,
+ upsertMode,
+ flushMode,
+ parallelism);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ protected FlinkKafkaProducer<RowData> createKafkaProducer(
+ SerializationSchema<RowData> keySerialization,
+ SerializationSchema<RowData> valueSerialization) {
+ final List<LogicalType> physicalChildren = physicalDataType.getLogicalType().getChildren();
+
+ final RowData.FieldGetter[] keyFieldGetters =
+ Arrays.stream(keyProjection)
+ .mapToObj(
+ targetField ->
+ RowData.createFieldGetter(
+ physicalChildren.get(targetField), targetField))
+ .toArray(RowData.FieldGetter[]::new);
+
+ final RowData.FieldGetter[] valueFieldGetters =
+ Arrays.stream(valueProjection)
+ .mapToObj(
+ targetField ->
+ RowData.createFieldGetter(
+ physicalChildren.get(targetField), targetField))
+ .toArray(RowData.FieldGetter[]::new);
+
+ // determine the positions of metadata in the consumed row
+ final int[] metadataPositions =
+ Stream.of(WritableMetadata.values())
+ .mapToInt(
+ m -> {
+ final int pos = metadataKeys.indexOf(m.key);
+ if (pos < 0) {
+ return -1;
+ }
+ return physicalChildren.size() + pos;
+ })
+ .toArray();
+
+ // check if metadata is used at all
+ final boolean hasMetadata = metadataKeys.size() > 0;
+
+ final DynamicKafkaSerializationSchema kafkaSerializer =
+ new DynamicKafkaSerializationSchema(
+ topic,
+ partitioner,
+ keySerialization,
+ valueSerialization,
+ keyFieldGetters,
+ valueFieldGetters,
+ hasMetadata,
+ metadataPositions,
+ upsertMode);
+
+ return new FlinkKafkaProducer<>(
+ topic,
+ kafkaSerializer,
+ properties,
+ FlinkKafkaProducer.Semantic.valueOf(semantic.toString()),
+ FlinkKafkaProducer.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
+ }
+
+ private @Nullable SerializationSchema<RowData> createSerialization(
+ DynamicTableSink.Context context,
+ @Nullable EncodingFormat<SerializationSchema<RowData>> format,
+ int[] projection,
+ @Nullable String prefix) {
+ if (format == null) {
+ return null;
+ }
+ DataType physicalFormatDataType =
+ DataTypeUtils.projectRow(this.physicalDataType, projection);
+ if (prefix != null) {
+ physicalFormatDataType = DataTypeUtils.stripRowPrefix(physicalFormatDataType, prefix);
+ }
+ return format.createRuntimeEncoder(context, physicalFormatDataType);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Metadata handling
+ // --------------------------------------------------------------------------------------------
+
+ enum WritableMetadata {
+ HEADERS(
+ "headers",
+ // key and value of the map are nullable to make handling easier in queries
+ DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.BYTES().nullable())
+ .nullable(),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(RowData row, int pos) {
+ if (row.isNullAt(pos)) {
+ return null;
+ }
+ final MapData map = row.getMap(pos);
+ final ArrayData keyArray = map.keyArray();
+ final ArrayData valueArray = map.valueArray();
+ final List<Header> headers = new ArrayList<>();
+ for (int i = 0; i < keyArray.size(); i++) {
+ if (!keyArray.isNullAt(i) && !valueArray.isNullAt(i)) {
+ final String key = keyArray.getString(i).toString();
+ final byte[] value = valueArray.getBinary(i);
+ headers.add(new KafkaHeader(key, value));
+ }
+ }
+ return headers;
+ }
+ }),
+
+ TIMESTAMP(
+ "timestamp",
+ DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(RowData row, int pos) {
+ if (row.isNullAt(pos)) {
+ return null;
+ }
+ return row.getTimestamp(pos, 3).getMillisecond();
+ }
+ });
+
+ final String key;
+
+ final DataType dataType;
+
+ final MetadataConverter converter;
+
+ WritableMetadata(String key, DataType dataType, MetadataConverter converter) {
+ this.key = key;
+ this.dataType = dataType;
+ this.converter = converter;
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private static class KafkaHeader implements Header {
+
+ private final String key;
+
+ private final byte[] value;
+
+ KafkaHeader(String key, byte[] value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ @Override
+ public String key() {
+ return key;
+ }
+
+ @Override
+ public byte[] value() {
+ return value;
+ }
+ }
+}
diff --git a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/kafka/KafkaDynamicTableFactory.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/kafka/KafkaDynamicTableFactory.java
new file mode 100644
index 000000000..0f74f04e0
--- /dev/null
+++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/kafka/KafkaDynamicTableFactory.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.kafka;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource;
+import org.apache.flink.streaming.connectors.kafka.table.KafkaOptions;
+import org.apache.flink.streaming.connectors.kafka.table.KafkaSinkSemantic;
+import org.apache.flink.streaming.connectors.kafka.table.SinkBufferFlushMode;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.format.Format;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.KEY_FIELDS;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.KEY_FIELDS_PREFIX;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.KEY_FORMAT;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPERTIES_PREFIX;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPS_BOOTSTRAP_SERVERS;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPS_GROUP_ID;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_STARTUP_MODE;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_STARTUP_SPECIFIC_OFFSETS;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_TOPIC_PARTITION_DISCOVERY;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SINK_PARTITIONER;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SINK_SEMANTIC;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.StartupOptions;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.TOPIC;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.TOPIC_PATTERN;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.VALUE_FIELDS_INCLUDE;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.VALUE_FORMAT;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.autoCompleteSchemaRegistrySubject;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.createKeyFormatProjection;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.createValueFormatProjection;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getFlinkKafkaPartitioner;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getKafkaProperties;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getSinkSemantic;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getStartupOptions;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.validateTableSinkOptions;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.validateTableSourceOptions;
+import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
+
+/**
+ * Copy from org.apache.flink:flink-connector-kafka_2.11:1.13.5
+ *
+ * Factory for creating configured instances of {@link KafkaDynamicSource} and {@link
+ * KafkaDynamicSink}.We modify KafkaDynamicTableSink to support format metadata writeable.
+ */
+@Internal
+public class KafkaDynamicTableFactory
+ implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+ public static final String IDENTIFIER = "kafka-inlong";
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(PROPS_BOOTSTRAP_SERVERS);
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(FactoryUtil.FORMAT);
+ options.add(KEY_FORMAT);
+ options.add(KEY_FIELDS);
+ options.add(KEY_FIELDS_PREFIX);
+ options.add(VALUE_FORMAT);
+ options.add(VALUE_FIELDS_INCLUDE);
+ options.add(TOPIC);
+ options.add(TOPIC_PATTERN);
+ options.add(PROPS_GROUP_ID);
+ options.add(SCAN_STARTUP_MODE);
+ options.add(SCAN_STARTUP_SPECIFIC_OFFSETS);
+ options.add(SCAN_TOPIC_PARTITION_DISCOVERY);
+ options.add(SCAN_STARTUP_TIMESTAMP_MILLIS);
+ options.add(SINK_PARTITIONER);
+ options.add(SINK_SEMANTIC);
+ options.add(SINK_PARALLELISM);
+ return options;
+ }
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+ final ReadableConfig tableOptions = helper.getOptions();
+
+ final Optional<DecodingFormat<DeserializationSchema<RowData>>> keyDecodingFormat =
+ getKeyDecodingFormat(helper);
+
+ final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat =
+ getValueDecodingFormat(helper);
+
+ helper.validateExcept(PROPERTIES_PREFIX);
+
+ validateTableSourceOptions(tableOptions);
+
+ validatePKConstraints(
+ context.getObjectIdentifier(), context.getCatalogTable(), valueDecodingFormat);
+
+ final StartupOptions startupOptions = getStartupOptions(tableOptions);
+
+ final Properties properties = getKafkaProperties(context.getCatalogTable().getOptions());
+
+ // add topic-partition discovery
+ properties.setProperty(
+ FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
+ String.valueOf(
+ tableOptions
+ .getOptional(SCAN_TOPIC_PARTITION_DISCOVERY)
+ .map(Duration::toMillis)
+ .orElse(FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED)));
+
+ final DataType physicalDataType =
+ context.getCatalogTable().getSchema().toPhysicalRowDataType();
+
+ final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType);
+
+ final int[] valueProjection = createValueFormatProjection(tableOptions, physicalDataType);
+
+ final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
+
+ return createKafkaTableSource(
+ physicalDataType,
+ keyDecodingFormat.orElse(null),
+ valueDecodingFormat,
+ keyProjection,
+ valueProjection,
+ keyPrefix,
+ KafkaOptions.getSourceTopics(tableOptions),
+ KafkaOptions.getSourceTopicPattern(tableOptions),
+ properties,
+ startupOptions.startupMode,
+ startupOptions.specificOffsets,
+ startupOptions.startupTimestampMillis);
+ }
+
+ @Override
+ public DynamicTableSink createDynamicTableSink(Context context) {
+ final TableFactoryHelper helper =
+ FactoryUtil.createTableFactoryHelper(
+ this, autoCompleteSchemaRegistrySubject(context));
+
+ final ReadableConfig tableOptions = helper.getOptions();
+
+ final Optional<EncodingFormat<SerializationSchema<RowData>>> keyEncodingFormat =
+ getKeyEncodingFormat(helper);
+
+ final EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat =
+ getValueEncodingFormat(helper);
+
+ helper.validateExcept(PROPERTIES_PREFIX);
+
+ validateTableSinkOptions(tableOptions);
+
+ validatePKConstraints(
+ context.getObjectIdentifier(), context.getCatalogTable(), valueEncodingFormat);
+
+ final DataType physicalDataType =
+ context.getCatalogTable().getSchema().toPhysicalRowDataType();
+
+ final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType);
+
+ final int[] valueProjection = createValueFormatProjection(tableOptions, physicalDataType);
+
+ final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
+
+ final Integer parallelism = tableOptions.getOptional(SINK_PARALLELISM).orElse(null);
+
+ return createKafkaTableSink(
+ physicalDataType,
+ keyEncodingFormat.orElse(null),
+ valueEncodingFormat,
+ keyProjection,
+ valueProjection,
+ keyPrefix,
+ tableOptions.get(TOPIC).get(0),
+ getKafkaProperties(context.getCatalogTable().getOptions()),
+ getFlinkKafkaPartitioner(tableOptions, context.getClassLoader()).orElse(null),
+ getSinkSemantic(tableOptions),
+ parallelism);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private static Optional<DecodingFormat<DeserializationSchema<RowData>>> getKeyDecodingFormat(
+ TableFactoryHelper helper) {
+ final Optional<DecodingFormat<DeserializationSchema<RowData>>> keyDecodingFormat =
+ helper.discoverOptionalDecodingFormat(
+ DeserializationFormatFactory.class, KEY_FORMAT);
+ keyDecodingFormat.ifPresent(
+ format -> {
+ if (!format.getChangelogMode().containsOnly(RowKind.INSERT)) {
+ throw new ValidationException(
+ String.format(
+ "A key format should only deal with INSERT-only records. "
+ + "But %s has a changelog mode of %s.",
+ helper.getOptions().get(KEY_FORMAT),
+ format.getChangelogMode()));
+ }
+ });
+ return keyDecodingFormat;
+ }
+
+ private static Optional<EncodingFormat<SerializationSchema<RowData>>> getKeyEncodingFormat(
+ TableFactoryHelper helper) {
+ final Optional<EncodingFormat<SerializationSchema<RowData>>> keyEncodingFormat =
+ helper.discoverOptionalEncodingFormat(SerializationFormatFactory.class, KEY_FORMAT);
+ keyEncodingFormat.ifPresent(
+ format -> {
+ if (!format.getChangelogMode().containsOnly(RowKind.INSERT)) {
+ throw new ValidationException(
+ String.format(
+ "A key format should only deal with INSERT-only records. "
+ + "But %s has a changelog mode of %s.",
+ helper.getOptions().get(KEY_FORMAT),
+ format.getChangelogMode()));
+ }
+ });
+ return keyEncodingFormat;
+ }
+
+ private static DecodingFormat<DeserializationSchema<RowData>> getValueDecodingFormat(
+ TableFactoryHelper helper) {
+ return helper.discoverOptionalDecodingFormat(
+ DeserializationFormatFactory.class, FactoryUtil.FORMAT)
+ .orElseGet(
+ () ->
+ helper.discoverDecodingFormat(
+ DeserializationFormatFactory.class, VALUE_FORMAT));
+ }
+
+ private static EncodingFormat<SerializationSchema<RowData>> getValueEncodingFormat(
+ TableFactoryHelper helper) {
+ return helper.discoverOptionalEncodingFormat(
+ SerializationFormatFactory.class, FactoryUtil.FORMAT)
+ .orElseGet(
+ () ->
+ helper.discoverEncodingFormat(
+ SerializationFormatFactory.class, VALUE_FORMAT));
+ }
+
+ private static void validatePKConstraints(
+ ObjectIdentifier tableName, CatalogTable catalogTable, Format format) {
+ if (catalogTable.getSchema().getPrimaryKey().isPresent()
+ && format.getChangelogMode().containsOnly(RowKind.INSERT)) {
+ Configuration options = Configuration.fromMap(catalogTable.getOptions());
+ String formatName =
+ options.getOptional(FactoryUtil.FORMAT).orElse(options.get(VALUE_FORMAT));
+ throw new ValidationException(
+ String.format(
+ "The Kafka table '%s' with '%s' format doesn't support defining PRIMARY KEY constraint"
+ + " on the table, because it can't guarantee the semantic of primary key.",
+ tableName.asSummaryString(), formatName));
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ protected KafkaDynamicSource createKafkaTableSource(
+ DataType physicalDataType,
+ @Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat,
+ DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
+ int[] keyProjection,
+ int[] valueProjection,
+ @Nullable String keyPrefix,
+ @Nullable List<String> topics,
+ @Nullable Pattern topicPattern,
+ Properties properties,
+ StartupMode startupMode,
+ Map<KafkaTopicPartition, Long> specificStartupOffsets,
+ long startupTimestampMillis) {
+ return new KafkaDynamicSource(
+ physicalDataType,
+ keyDecodingFormat,
+ valueDecodingFormat,
+ keyProjection,
+ valueProjection,
+ keyPrefix,
+ topics,
+ topicPattern,
+ properties,
+ startupMode,
+ specificStartupOffsets,
+ startupTimestampMillis,
+ false);
+ }
+
+ protected KafkaDynamicSink createKafkaTableSink(
+ DataType physicalDataType,
+ @Nullable EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat,
+ EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat,
+ int[] keyProjection,
+ int[] valueProjection,
+ @Nullable String keyPrefix,
+ String topic,
+ Properties properties,
+ FlinkKafkaPartitioner<RowData> partitioner,
+ KafkaSinkSemantic semantic,
+ Integer parallelism) {
+ return new KafkaDynamicSink(
+ physicalDataType,
+ physicalDataType,
+ keyEncodingFormat,
+ valueEncodingFormat,
+ keyProjection,
+ valueProjection,
+ keyPrefix,
+ topic,
+ properties,
+ partitioner,
+ semantic,
+ false,
+ SinkBufferFlushMode.DISABLED,
+ parallelism);
+ }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkBuilder.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/kafka/KafkaSinkBuilder.java
similarity index 97%
rename from inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkBuilder.java
rename to inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/kafka/KafkaSinkBuilder.java
index f7b20ecc1..c7d08a7e1 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkBuilder.java
+++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/kafka/KafkaSinkBuilder.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.singletenant.flink.kafka;
+package org.apache.inlong.sort.flink.kafka;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
diff --git a/inlong-sort/sort-connectors/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-connectors/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory
new file mode 100644
index 000000000..9719b692b
--- /dev/null
+++ b/inlong-sort/sort-connectors/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.flink.kafka.KafkaDynamicTableFactory
\ No newline at end of file
diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedDecodingFormat.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedDecodingFormat.java
new file mode 100644
index 000000000..7e77e1eb0
--- /dev/null
+++ b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedDecodingFormat.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.json.canal;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.types.RowKind;
+import org.apache.inlong.sort.formats.json.canal.CanalJsonEnhancedDeserializationSchema.MetadataConverter;
+
+/**
+ * {@link DecodingFormat} for Canal using JSON encoding.
+ * different from flink:1.13.5. This support more metadata.
+ */
+public class CanalJsonEnhancedDecodingFormat implements DecodingFormat<DeserializationSchema<RowData>> {
+
+ // --------------------------------------------------------------------------------------------
+ // Mutable attributes
+ // --------------------------------------------------------------------------------------------
+
+ private List<String> metadataKeys;
+
+ // --------------------------------------------------------------------------------------------
+ // Canal-specific attributes
+ // --------------------------------------------------------------------------------------------
+
+ private final @Nullable String database;
+
+ private final @Nullable String table;
+
+ private final boolean ignoreParseErrors;
+
+ private final TimestampFormat timestampFormat;
+
+ public CanalJsonEnhancedDecodingFormat(
+ String database,
+ String table,
+ boolean ignoreParseErrors,
+ TimestampFormat timestampFormat) {
+ this.database = database;
+ this.table = table;
+ this.ignoreParseErrors = ignoreParseErrors;
+ this.timestampFormat = timestampFormat;
+ this.metadataKeys = Collections.emptyList();
+ }
+
+ @Override
+ public DeserializationSchema<RowData> createRuntimeDecoder(
+ DynamicTableSource.Context context, DataType physicalDataType) {
+ final List<ReadableMetadata> readableMetadata =
+ metadataKeys.stream()
+ .map(
+ k ->
+ Stream.of(ReadableMetadata.values())
+ .filter(rm -> rm.key.equals(k))
+ .findFirst()
+ .<IllegalStateException>orElseThrow(IllegalStateException::new))
+ .collect(Collectors.toList());
+ final List<DataTypes.Field> metadataFields =
+ readableMetadata.stream()
+ .map(m -> DataTypes.FIELD(m.key, m.dataType))
+ .collect(Collectors.toList());
+ final DataType producedDataType =
+ DataTypeUtils.appendRowFields(physicalDataType, metadataFields);
+ final TypeInformation<RowData> producedTypeInfo =
+ context.createTypeInformation(producedDataType);
+ return CanalJsonEnhancedDeserializationSchema.builder(
+ physicalDataType, readableMetadata, producedTypeInfo)
+ .setDatabase(database)
+ .setTable(table)
+ .setIgnoreParseErrors(ignoreParseErrors)
+ .setTimestampFormat(timestampFormat)
+ .build();
+ }
+
+ @Override
+ public Map<String, DataType> listReadableMetadata() {
+ final Map<String, DataType> metadataMap = new LinkedHashMap<>();
+ Stream.of(ReadableMetadata.values())
+ .forEachOrdered(m -> metadataMap.put(m.key, m.dataType));
+ return metadataMap;
+ }
+
+ @Override
+ public void applyReadableMetadata(List<String> metadataKeys) {
+ this.metadataKeys = metadataKeys;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.newBuilder()
+ .addContainedKind(RowKind.INSERT)
+ .addContainedKind(RowKind.UPDATE_BEFORE)
+ .addContainedKind(RowKind.UPDATE_AFTER)
+ .addContainedKind(RowKind.DELETE)
+ .build();
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Metadata handling
+ // --------------------------------------------------------------------------------------------
+
+ /** List of metadata that can be read with this format. */
+ public enum ReadableMetadata {
+ DATABASE(
+ "database",
+ DataTypes.STRING().nullable(),
+ DataTypes.FIELD("database", DataTypes.STRING()),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(GenericRowData row, int pos) {
+ return row.getString(pos);
+ }
+ }),
+
+ TABLE(
+ "table",
+ DataTypes.STRING().nullable(),
+ DataTypes.FIELD("table", DataTypes.STRING()),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(GenericRowData row, int pos) {
+ return row.getString(pos);
+ }
+ }),
+
+ SQL_TYPE(
+ "sql-type",
+ DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.INT().nullable()).nullable(),
+ DataTypes.FIELD(
+ "sqlType",
+ DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.INT().nullable())),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(GenericRowData row, int pos) {
+ return row.getMap(pos);
+ }
+ }),
+
+ PK_NAMES(
+ "pk-names",
+ DataTypes.ARRAY(DataTypes.STRING()).nullable(),
+ DataTypes.FIELD("pkNames", DataTypes.ARRAY(DataTypes.STRING())),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(GenericRowData row, int pos) {
+ return row.getArray(pos);
+ }
+ }),
+
+ INGESTION_TIMESTAMP(
+ "ingestion-timestamp",
+ DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
+ DataTypes.FIELD("ts", DataTypes.BIGINT()),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(GenericRowData row, int pos) {
+ if (row.isNullAt(pos)) {
+ return null;
+ }
+ return TimestampData.fromEpochMillis(row.getLong(pos));
+ }
+ }),
+
+ EVENT_TIMESTAMP(
+ "event-timestamp",
+ DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
+ DataTypes.FIELD("es", DataTypes.BIGINT()),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(GenericRowData row, int pos) {
+ if (row.isNullAt(pos)) {
+ return null;
+ }
+ return TimestampData.fromEpochMillis(row.getLong(pos));
+ }
+ }),
+ // additional metadata
+ OP_TYPE(
+ "op-type",
+ DataTypes.STRING().nullable(),
+ DataTypes.FIELD("opType", DataTypes.STRING()),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(GenericRowData row, int pos) {
+ if (row.isNullAt(pos)) {
+ return null;
+ }
+ return row.getString(pos);
+ }
+ }),
+ IS_DDL(
+ "is-ddl",
+ DataTypes.BOOLEAN().nullable(),
+ DataTypes.FIELD("isDdl", DataTypes.BOOLEAN()),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(GenericRowData row, int pos) {
+ if (row.isNullAt(pos)) {
+ return null;
+ }
+ return row.getBoolean(pos);
+ }
+ }),
+
+ MYSQL_TYPE(
+ "mysql-type",
+ DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.STRING().nullable()).nullable(),
+ DataTypes.FIELD("mysqlType", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(GenericRowData row, int pos) {
+ if (row.isNullAt(pos)) {
+ return null;
+ }
+ return row.getMap(pos);
+ }
+ }),
+ BATCH_ID(
+ "batch-id",
+ DataTypes.BIGINT().nullable(),
+ DataTypes.FIELD("batchId", DataTypes.BIGINT()),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(GenericRowData row, int pos) {
+ if (row.isNullAt(pos)) {
+ return null;
+ }
+ return row.getLong(pos);
+ }
+ }),
+ UPDATE_BEFORE(
+ "update-before",
+ DataTypes.ARRAY(
+ DataTypes.MAP(
+ DataTypes.STRING().nullable(),
+ DataTypes.STRING().nullable())
+ .nullable())
+ .nullable(),
+ DataTypes.FIELD("updateBefore", DataTypes.ARRAY(
+ DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(GenericRowData row, int pos) {
+ if (row.isNullAt(pos)) {
+ return null;
+ }
+ return row.getArray(pos);
+ }
+ });
+
+ final String key;
+
+ final DataType dataType;
+
+ final DataTypes.Field requiredJsonField;
+
+ final MetadataConverter converter;
+
+ ReadableMetadata(
+ String key,
+ DataType dataType,
+ DataTypes.Field requiredJsonField,
+ MetadataConverter converter) {
+ this.key = key;
+ this.dataType = dataType;
+ this.requiredJsonField = requiredJsonField;
+ this.converter = converter;
+ }
+
+ public String getKey() {
+ return key;
+ }
+ }
+}
diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedDeserializationSchema.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedDeserializationSchema.java
new file mode 100644
index 000000000..90696bc14
--- /dev/null
+++ b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedDeserializationSchema.java
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.json.canal;
+
+import static java.lang.String.format;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Objects;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Collector;
+import org.apache.inlong.sort.formats.json.canal.CanalJsonEnhancedDecodingFormat.ReadableMetadata;
+
+/**
+ * Deserialization schema from Canal JSON to Flink Table/SQL internal data structure {@link
+ * RowData}. The deserialization schema knows Canal's schema definition and can extract the database
+ * data and convert into {@link RowData} with {@link RowKind}.
+ *
+ * <p>Deserializes a <code>byte[]</code> message as a JSON object and reads the specified fields.
+ *
+ * <p>Failures during deserialization are forwarded as wrapped IOExceptions.
+ *
+ * @see <a href="https://github.com/alibaba/canal">Alibaba Canal</a>
+ */
+public final class CanalJsonEnhancedDeserializationSchema implements DeserializationSchema<RowData> {
+ private static final long serialVersionUID = 1L;
+
+ private static final String FIELD_OLD = "old";
+ private static final String OP_INSERT = "INSERT";
+ private static final String OP_UPDATE = "UPDATE";
+ private static final String OP_DELETE = "DELETE";
+ private static final String OP_CREATE = "CREATE";
+
+ /** The deserializer to deserialize Canal JSON data. */
+ private final JsonRowDataDeserializationSchema jsonDeserializer;
+
+ /** Flag that indicates that an additional projection is required for metadata. */
+ private final boolean hasMetadata;
+
+ /** Metadata to be extracted for every record. */
+ private final MetadataConverter[] metadataConverters;
+
+ /** {@link TypeInformation} of the produced {@link RowData} (physical + meta data). */
+ private final TypeInformation<RowData> producedTypeInfo;
+
+ /** Only read changelogs from the specific database. */
+ private final @Nullable String database;
+
+ /** Only read changelogs from the specific table. */
+ private final @Nullable String table;
+
+ /** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */
+ private final boolean ignoreParseErrors;
+
+ /** Names of fields. */
+ private final List<String> fieldNames;
+
+ /** Number of fields. */
+ private final int fieldCount;
+
+ /** Pattern of the specific database. */
+ private final Pattern databasePattern;
+
+ /** Pattern of the specific table. */
+ private final Pattern tablePattern;
+
+ private CanalJsonEnhancedDeserializationSchema(
+ DataType physicalDataType,
+ List<ReadableMetadata> requestedMetadata,
+ TypeInformation<RowData> producedTypeInfo,
+ @Nullable String database,
+ @Nullable String table,
+ boolean ignoreParseErrors,
+ TimestampFormat timestampFormat) {
+ final RowType jsonRowType = createJsonRowType(physicalDataType, requestedMetadata);
+ this.jsonDeserializer =
+ new JsonRowDataDeserializationSchema(
+ jsonRowType,
+ // the result type is never used, so it's fine to pass in the produced type
+ // info
+ producedTypeInfo,
+ false, // ignoreParseErrors already contains the functionality of
+ // failOnMissingField
+ ignoreParseErrors,
+ timestampFormat);
+ this.hasMetadata = requestedMetadata.size() > 0;
+ this.metadataConverters = createMetadataConverters(jsonRowType, requestedMetadata);
+ this.producedTypeInfo = producedTypeInfo;
+ this.database = database;
+ this.table = table;
+ this.ignoreParseErrors = ignoreParseErrors;
+ final RowType physicalRowType = ((RowType) physicalDataType.getLogicalType());
+ this.fieldNames = physicalRowType.getFieldNames();
+ this.fieldCount = physicalRowType.getFieldCount();
+ this.databasePattern = database == null ? null : Pattern.compile(database);
+ this.tablePattern = table == null ? null : Pattern.compile(table);
+ }
+
+ // ------------------------------------------------------------------------------------------
+ // Builder
+ // ------------------------------------------------------------------------------------------
+
+ /** Creates A builder for building a {@link CanalJsonEnhancedDeserializationSchema}. */
+ public static Builder builder(
+ DataType physicalDataType,
+ List<ReadableMetadata> requestedMetadata,
+ TypeInformation<RowData> producedTypeInfo) {
+ return new Builder(physicalDataType, requestedMetadata, producedTypeInfo);
+ }
+
+ /** A builder for creating a {@link CanalJsonEnhancedDeserializationSchema}. */
+ @Internal
+ public static final class Builder {
+ private final DataType physicalDataType;
+ private final List<ReadableMetadata> requestedMetadata;
+ private final TypeInformation<RowData> producedTypeInfo;
+ private String database = null;
+ private String table = null;
+ private boolean ignoreParseErrors = false;
+ private TimestampFormat timestampFormat = TimestampFormat.SQL;
+
+ private Builder(
+ DataType physicalDataType,
+ List<ReadableMetadata> requestedMetadata,
+ TypeInformation<RowData> producedTypeInfo) {
+ this.physicalDataType = physicalDataType;
+ this.requestedMetadata = requestedMetadata;
+ this.producedTypeInfo = producedTypeInfo;
+ }
+
+ public Builder setDatabase(String database) {
+ this.database = database;
+ return this;
+ }
+
+ public Builder setTable(String table) {
+ this.table = table;
+ return this;
+ }
+
+ public Builder setIgnoreParseErrors(boolean ignoreParseErrors) {
+ this.ignoreParseErrors = ignoreParseErrors;
+ return this;
+ }
+
+ public Builder setTimestampFormat(TimestampFormat timestampFormat) {
+ this.timestampFormat = timestampFormat;
+ return this;
+ }
+
+ public CanalJsonEnhancedDeserializationSchema build() {
+ return new CanalJsonEnhancedDeserializationSchema(
+ physicalDataType,
+ requestedMetadata,
+ producedTypeInfo,
+ database,
+ table,
+ ignoreParseErrors,
+ timestampFormat);
+ }
+ }
+
+ // ------------------------------------------------------------------------------------------
+
+ @Override
+ public RowData deserialize(byte[] message) throws IOException {
+ throw new RuntimeException(
+ "Please invoke DeserializationSchema#deserialize(byte[], Collector<RowData>) instead.");
+ }
+
+ @Override
+ public void deserialize(@Nullable byte[] message, Collector<RowData> out) throws IOException {
+ if (message == null || message.length == 0) {
+ return;
+ }
+ try {
+ final JsonNode root = jsonDeserializer.deserializeToJsonNode(message);
+ if (database != null) {
+ if (!databasePattern
+ .matcher(root.get(ReadableMetadata.DATABASE.key).asText())
+ .matches()) {
+ return;
+ }
+ }
+ if (table != null) {
+ if (!tablePattern
+ .matcher(root.get(ReadableMetadata.TABLE.key).asText())
+ .matches()) {
+ return;
+ }
+ }
+ final GenericRowData row = (GenericRowData) jsonDeserializer.convertToRowData(root);
+ String type = row.getString(2).toString(); // "type" field
+ if (OP_INSERT.equals(type)) {
+ // "data" field is an array of row, contains inserted rows
+ ArrayData data = row.getArray(0);
+ for (int i = 0; i < data.size(); i++) {
+ GenericRowData insert = (GenericRowData) data.getRow(i, fieldCount);
+ insert.setRowKind(RowKind.INSERT);
+ emitRow(row, insert, out);
+ }
+ } else if (OP_UPDATE.equals(type)) {
+ // "data" field is an array of row, contains new rows
+ ArrayData data = row.getArray(0);
+ // "old" field is an array of row, contains old values
+ ArrayData old = row.getArray(1);
+ for (int i = 0; i < data.size(); i++) {
+ // the underlying JSON deserialization schema always produce GenericRowData.
+ GenericRowData after = (GenericRowData) data.getRow(i, fieldCount);
+ GenericRowData before = (GenericRowData) old.getRow(i, fieldCount);
+ final JsonNode oldField = root.get(FIELD_OLD);
+ for (int f = 0; f < fieldCount; f++) {
+ if (before.isNullAt(f) && oldField.findValue(fieldNames.get(f)) == null) {
+ // fields in "old" (before) means the fields are changed
+ // fields not in "old" (before) means the fields are not changed
+ // so we just copy the not changed fields into before
+ before.setField(f, after.getField(f));
+ }
+ }
+ before.setRowKind(RowKind.UPDATE_BEFORE);
+ after.setRowKind(RowKind.UPDATE_AFTER);
+ emitRow(row, before, out);
+ emitRow(row, after, out);
+ }
+ } else if (OP_DELETE.equals(type)) {
+ // "data" field is an array of row, contains deleted rows
+ ArrayData data = row.getArray(0);
+ for (int i = 0; i < data.size(); i++) {
+ GenericRowData insert = (GenericRowData) data.getRow(i, fieldCount);
+ insert.setRowKind(RowKind.DELETE);
+ emitRow(row, insert, out);
+ }
+ } else if (OP_CREATE.equals(type)) {
+ // "data" field is null and "type" is "CREATE" which means
+ // this is a DDL change event, and we should skip it.
+ return;
+ } else {
+ if (!ignoreParseErrors) {
+ throw new IOException(
+ format(
+ "Unknown \"type\" value \"%s\". The Canal JSON message is '%s'",
+ type, new String(message)));
+ }
+ }
+ } catch (Throwable t) {
+ // a big try catch to protect the processing.
+ if (!ignoreParseErrors) {
+ throw new IOException(
+ format("Corrupt Canal JSON message '%s'.", new String(message)), t);
+ }
+ }
+ }
+
+ private void emitRow(
+ GenericRowData rootRow, GenericRowData physicalRow, Collector<RowData> out) {
+ // shortcut in case no output projection is required
+ if (!hasMetadata) {
+ out.collect(physicalRow);
+ return;
+ }
+ final int physicalArity = physicalRow.getArity();
+ final int metadataArity = metadataConverters.length;
+ final GenericRowData producedRow =
+ new GenericRowData(physicalRow.getRowKind(), physicalArity + metadataArity);
+ for (int physicalPos = 0; physicalPos < physicalArity; physicalPos++) {
+ producedRow.setField(physicalPos, physicalRow.getField(physicalPos));
+ }
+ for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) {
+ producedRow.setField(
+ physicalArity + metadataPos, metadataConverters[metadataPos].convert(rootRow));
+ }
+ out.collect(producedRow);
+ }
+
+ @Override
+ public boolean isEndOfStream(RowData nextElement) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation<RowData> getProducedType() {
+ return producedTypeInfo;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CanalJsonEnhancedDeserializationSchema that = (CanalJsonEnhancedDeserializationSchema) o;
+ return Objects.equals(jsonDeserializer, that.jsonDeserializer)
+ && hasMetadata == that.hasMetadata
+ && Objects.equals(producedTypeInfo, that.producedTypeInfo)
+ && Objects.equals(database, that.database)
+ && Objects.equals(table, that.table)
+ && ignoreParseErrors == that.ignoreParseErrors
+ && fieldCount == that.fieldCount;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ jsonDeserializer,
+ hasMetadata,
+ producedTypeInfo,
+ database,
+ table,
+ ignoreParseErrors,
+ fieldCount);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private static RowType createJsonRowType(
+ DataType physicalDataType, List<ReadableMetadata> readableMetadata) {
+ // Canal JSON contains other information, e.g. "ts", "sql", but we don't need them
+ DataType root =
+ DataTypes.ROW(
+ DataTypes.FIELD("data", DataTypes.ARRAY(physicalDataType)),
+ DataTypes.FIELD("old", DataTypes.ARRAY(physicalDataType)),
+ DataTypes.FIELD("type", DataTypes.STRING()),
+ ReadableMetadata.DATABASE.requiredJsonField,
+ ReadableMetadata.TABLE.requiredJsonField);
+ // append fields that are required for reading metadata in the root
+ final List<DataTypes.Field> rootMetadataFields =
+ readableMetadata.stream()
+ .filter(m -> m != ReadableMetadata.DATABASE && m != ReadableMetadata.TABLE)
+ .map(m -> m.requiredJsonField)
+ .distinct()
+ .collect(Collectors.toList());
+ return (RowType) DataTypeUtils.appendRowFields(root, rootMetadataFields).getLogicalType();
+ }
+
+ private static MetadataConverter[] createMetadataConverters(
+ RowType jsonRowType, List<ReadableMetadata> requestedMetadata) {
+ return requestedMetadata.stream()
+ .map(m -> convert(jsonRowType, m))
+ .toArray(MetadataConverter[]::new);
+ }
+
+ private static MetadataConverter convert(RowType jsonRowType, ReadableMetadata metadata) {
+ final int pos = jsonRowType.getFieldNames().indexOf(metadata.requiredJsonField.getName());
+ return new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(GenericRowData root, int unused) {
+ return metadata.converter.convert(root, pos);
+ }
+ };
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Converter that extracts a metadata field from the row that comes out of the JSON schema and
+ * converts it to the desired data type.
+ */
+ interface MetadataConverter extends Serializable {
+
+ // Method for top-level access.
+ default Object convert(GenericRowData row) {
+ return convert(row, -1);
+ }
+
+ Object convert(GenericRowData row, int pos);
+ }
+}
diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedEncodingFormat.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedEncodingFormat.java
new file mode 100644
index 000000000..a74d9ecf0
--- /dev/null
+++ b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedEncodingFormat.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.json.canal;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonOptions;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink.Context;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+import org.apache.inlong.sort.formats.json.canal.CanalJsonEnhancedSerializationSchema.MetadataConverter;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * {@link org.apache.flink.table.connector.format.EncodingFormat} for Canal using JSON encoding.
+ *
+ * different from flink:1.13.5. This can apply metadata, sink metadata into canal format
+ */
+public class CanalJsonEnhancedEncodingFormat implements EncodingFormat<SerializationSchema<RowData>> {
+
+ private List<String> metadataKeys;
+
+ private final TimestampFormat timestampFormat;
+
+ private final JsonOptions.MapNullKeyMode mapNullKeyMode;
+
+ private final String mapNullKeyLiteral;
+
+ private boolean encodeDecimalAsPlainNumber;
+
+ public CanalJsonEnhancedEncodingFormat(
+ TimestampFormat timestampFormat,
+ JsonOptions.MapNullKeyMode mapNullKeyMode,
+ String mapNullKeyLiteral,
+ boolean encodeDecimalAsPlainNumber) {
+ this.timestampFormat = timestampFormat;
+ this.mapNullKeyMode = mapNullKeyMode;
+ this.mapNullKeyLiteral = mapNullKeyLiteral;
+ this.encodeDecimalAsPlainNumber = encodeDecimalAsPlainNumber;
+ this.metadataKeys = Collections.emptyList();
+ }
+
+ @Override
+ public SerializationSchema<RowData> createRuntimeEncoder(Context context, DataType physicalDataType) {
+ final List<WriteableMetadata> writeableMetadata =
+ metadataKeys.stream()
+ .map(
+ k ->
+ Stream.of(WriteableMetadata.values())
+ .filter(rm -> rm.key.equals(k))
+ .findFirst()
+ .orElseThrow(IllegalStateException::new))
+ .collect(Collectors.toList());
+ return new CanalJsonEnhancedSerializationSchema(
+ physicalDataType,
+ writeableMetadata,
+ timestampFormat,
+ mapNullKeyMode,
+ mapNullKeyLiteral,
+ encodeDecimalAsPlainNumber);
+ }
+
+ @Override
+ public Map<String, DataType> listWritableMetadata() {
+ return Arrays.stream(WriteableMetadata.values())
+ .collect(
+ Collectors.toMap(m -> m.key, m -> m.dataType));
+ }
+
+ @Override
+ public void applyWritableMetadata(List<String> metadataKeys) {
+ this.metadataKeys = metadataKeys;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.newBuilder()
+ .addContainedKind(RowKind.INSERT)
+ .addContainedKind(RowKind.UPDATE_BEFORE)
+ .addContainedKind(RowKind.UPDATE_AFTER)
+ .addContainedKind(RowKind.DELETE)
+ .build();
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Metadata handling
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * List of metadata that can write into this format.
+ * canal json inner data type
+ */
+ enum WriteableMetadata {
+ DATABASE(
+ "database",
+ DataTypes.STRING().nullable(),
+ DataTypes.FIELD("database", DataTypes.STRING()),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(RowData row, int pos) {
+ return row.getString(pos);
+ }
+ }),
+ TABLE(
+ "table",
+ DataTypes.STRING().nullable(),
+ DataTypes.FIELD("table", DataTypes.STRING()),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(RowData row, int pos) {
+ return row.getString(pos);
+ }
+ }),
+ SQL_TYPE(
+ "sql-type",
+ DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.INT().nullable()).nullable(),
+ DataTypes.FIELD(
+ "sqlType",
+ DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.INT().nullable())),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(RowData row, int pos) {
+ return row.getMap(pos);
+ }
+ }),
+ PK_NAMES(
+ "pk-names",
+ DataTypes.ARRAY(DataTypes.STRING()).nullable(),
+ DataTypes.FIELD("pkNames", DataTypes.ARRAY(DataTypes.STRING())),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(RowData row, int pos) {
+ return row.getArray(pos);
+ }
+ }),
+ INGESTION_TIMESTAMP(
+ "ingestion-timestamp",
+ DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
+ DataTypes.FIELD("ts", DataTypes.BIGINT()),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(RowData row, int pos) {
+ return row.getTimestamp(pos, 3).getMillisecond();
+ }
+ }),
+ EVENT_TIMESTAMP(
+ "event-timestamp",
+ DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
+ DataTypes.FIELD("es", DataTypes.BIGINT()),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(RowData row, int pos) {
+ return row.getTimestamp(pos, 3).getMillisecond();
+ }
+ }),
+ // additional metadata
+ OP_TYPE(
+ "op-type",
+ DataTypes.STRING().nullable(),
+ DataTypes.FIELD("opType", DataTypes.STRING()),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(RowData row, int pos) {
+ return row.getString(pos);
+ }
+ }),
+ IS_DDL(
+ "is-ddl",
+ DataTypes.BOOLEAN().nullable(),
+ DataTypes.FIELD("isDdl", DataTypes.BOOLEAN()),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(RowData row, int pos) {
+ return row.getBoolean(pos);
+ }
+ }),
+
+ MYSQL_TYPE(
+ "mysql-type",
+ DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.STRING().nullable()).nullable(),
+ DataTypes.FIELD("mysqlType", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(RowData row, int pos) {
+ return row.getMap(pos);
+ }
+ }),
+ BATCH_ID(
+ "batch-id",
+ DataTypes.BIGINT().nullable(),
+ DataTypes.FIELD("batchId", DataTypes.BIGINT()),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(RowData row, int pos) {
+ return row.getLong(pos);
+ }
+ }),
+ UPDATE_BEFORE(
+ "update-before",
+ DataTypes.ARRAY(
+ DataTypes.MAP(
+ DataTypes.STRING().nullable(), DataTypes.STRING().nullable()).nullable())
+ .nullable(),
+ DataTypes.FIELD("updateBefore", DataTypes.ARRAY(
+ DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(RowData row, int pos) {
+ return row.getArray(pos);
+ }
+ });
+
+ final String key;
+
+ final DataType dataType;
+
+ final DataTypes.Field requiredJsonField;
+
+ final MetadataConverter converter;
+
+ WriteableMetadata(
+ String key,
+ DataType dataType,
+ DataTypes.Field requiredJsonField,
+ MetadataConverter converter) {
+ this.key = key;
+ this.dataType = dataType;
+ this.requiredJsonField = requiredJsonField;
+ this.converter = converter;
+ }
+ }
+}
diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedFormatFactory.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedFormatFactory.java
new file mode 100644
index 000000000..f43d9cea7
--- /dev/null
+++ b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedFormatFactory.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.json.canal;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonOptions;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.formats.json.JsonOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER;
+import static org.apache.flink.formats.json.canal.CanalJsonOptions.DATABASE_INCLUDE;
+import static org.apache.flink.formats.json.canal.CanalJsonOptions.IGNORE_PARSE_ERRORS;
+import static org.apache.flink.formats.json.canal.CanalJsonOptions.JSON_MAP_NULL_KEY_LITERAL;
+import static org.apache.flink.formats.json.canal.CanalJsonOptions.JSON_MAP_NULL_KEY_MODE;
+import static org.apache.flink.formats.json.canal.CanalJsonOptions.TABLE_INCLUDE;
+import static org.apache.flink.formats.json.canal.CanalJsonOptions.TIMESTAMP_FORMAT;
+import static org.apache.flink.formats.json.canal.CanalJsonOptions.validateDecodingFormatOptions;
+import static org.apache.flink.formats.json.canal.CanalJsonOptions.validateEncodingFormatOptions;
+
+/**
+ * Format factory for providing configured instances of Canal JSON to RowData {@link
+ * DeserializationSchema}.
+ * Different from flink:1.13.5.This can sink metadata.
+ */
+public class CanalJsonEnhancedFormatFactory
+ implements DeserializationFormatFactory, SerializationFormatFactory {
+
+ public static final String IDENTIFIER = "canal-json-inlong";
+
+ @Override
+ public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
+ DynamicTableFactory.Context context, ReadableConfig formatOptions) {
+ FactoryUtil.validateFactoryOptions(this, formatOptions);
+ validateDecodingFormatOptions(formatOptions);
+
+ final String database = formatOptions.getOptional(DATABASE_INCLUDE).orElse(null);
+ final String table = formatOptions.getOptional(TABLE_INCLUDE).orElse(null);
+ final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
+ final TimestampFormat timestampFormat = JsonOptions.getTimestampFormat(formatOptions);
+
+ return new CanalJsonEnhancedDecodingFormat(database, table, ignoreParseErrors, timestampFormat);
+ }
+
+ @Override
+ public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(
+ DynamicTableFactory.Context context, ReadableConfig formatOptions) {
+
+ FactoryUtil.validateFactoryOptions(this, formatOptions);
+ validateEncodingFormatOptions(formatOptions);
+
+ TimestampFormat timestampFormat = JsonOptions.getTimestampFormat(formatOptions);
+ JsonOptions.MapNullKeyMode mapNullKeyMode = JsonOptions.getMapNullKeyMode(formatOptions);
+ String mapNullKeyLiteral = formatOptions.get(JSON_MAP_NULL_KEY_LITERAL);
+
+ final boolean encodeDecimalAsPlainNumber =
+ formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
+
+ return new CanalJsonEnhancedEncodingFormat(timestampFormat, mapNullKeyMode,
+ mapNullKeyLiteral, encodeDecimalAsPlainNumber);
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(IGNORE_PARSE_ERRORS);
+ options.add(TIMESTAMP_FORMAT);
+ options.add(DATABASE_INCLUDE);
+ options.add(TABLE_INCLUDE);
+ options.add(JSON_MAP_NULL_KEY_MODE);
+ options.add(JSON_MAP_NULL_KEY_LITERAL);
+ options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
+ return options;
+ }
+}
\ No newline at end of file
diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedSerializationSchema.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedSerializationSchema.java
new file mode 100644
index 000000000..c13c3f23d
--- /dev/null
+++ b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedSerializationSchema.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.json.canal;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonOptions;
+import org.apache.flink.formats.json.JsonRowDataSerializationSchema;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.types.RowKind;
+import org.apache.inlong.sort.formats.json.canal.CanalJsonEnhancedEncodingFormat.WriteableMetadata;
+
+import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Serialization schema that serializes an object of Flink Table/SQL internal data structure {@link
+ * RowData} into a Canal JSON bytes.
+ * Different from flink:1.13.5.This can write metadata.
+ *
+ * @see <a href="https://github.com/alibaba/canal">Alibaba Canal</a>
+ */
+public class CanalJsonEnhancedSerializationSchema implements SerializationSchema<RowData> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final StringData OP_INSERT = StringData.fromString("INSERT");
+ private static final StringData OP_DELETE = StringData.fromString("DELETE");
+
+ private transient GenericRowData reuse;
+
+ /** The serializer to serialize Canal JSON data. */
+ private final JsonRowDataSerializationSchema jsonSerializer;
+
+ private final RowData.FieldGetter[] physicalFieldGetter;
+
+ private final RowData.FieldGetter[] wirteableMetadataFieldGetter;
+
+ /** row schema that json serializer can parse output row to json format */
+ private final RowType jsonRowType;
+
+ public CanalJsonEnhancedSerializationSchema(
+ DataType physicalDataType,
+ List<WriteableMetadata> writeableMetadata,
+ TimestampFormat timestampFormat,
+ JsonOptions.MapNullKeyMode mapNullKeyMode,
+ String mapNullKeyLiteral,
+ boolean encodeDecimalAsPlainNumber) {
+ final List<LogicalType> physicalChildren = physicalDataType.getLogicalType().getChildren();
+ this.jsonRowType = createJsonRowType(physicalDataType, writeableMetadata);
+ this.physicalFieldGetter = IntStream.range(0, physicalChildren.size())
+ .mapToObj(targetField ->
+ RowData.createFieldGetter(physicalChildren.get(targetField), targetField))
+ .toArray(RowData.FieldGetter[]::new);
+ this.wirteableMetadataFieldGetter =
+ IntStream.range(physicalChildren.size(), physicalChildren.size() + writeableMetadata.size())
+ .mapToObj(targetField -> new RowData.FieldGetter() {
+ @Nullable
+ @Override
+ public Object getFieldOrNull(RowData row) {
+ WriteableMetadata curWriteableMetadata = writeableMetadata
+ .get(targetField - physicalChildren.size());
+ return curWriteableMetadata.converter.convert(row, targetField);
+ }
+ }).toArray(RowData.FieldGetter[]::new);
+
+ this.jsonSerializer =
+ new JsonRowDataSerializationSchema(
+ jsonRowType,
+ timestampFormat,
+ mapNullKeyMode,
+ mapNullKeyLiteral,
+ encodeDecimalAsPlainNumber);
+ }
+
+ @Override
+ public void open(InitializationContext context) {
+ reuse = new GenericRowData(2 + wirteableMetadataFieldGetter.length);
+ }
+
+ @Override
+ public byte[] serialize(RowData row) {
+ try {
+ // physical data injection
+ GenericRowData physicalData = new GenericRowData(physicalFieldGetter.length);
+ IntStream.range(0, physicalFieldGetter.length)
+ .forEach(targetField ->
+ physicalData.setField(targetField, physicalFieldGetter[targetField].getFieldOrNull(row)));
+ ArrayData arrayData = new GenericArrayData(new RowData[] {physicalData});
+ reuse.setField(0, arrayData);
+
+ // mete data injection
+ StringData opType = rowKind2String(row.getRowKind());
+ reuse.setField(1, opType);
+ IntStream.range(0, wirteableMetadataFieldGetter.length)
+ .forEach(targetField ->
+ reuse.setField(2 + targetField,
+ wirteableMetadataFieldGetter[targetField].getFieldOrNull(row)));
+ return jsonSerializer.serialize(reuse);
+ } catch (Throwable t) {
+ throw new RuntimeException("Could not serialize row '" + row + "'.", t);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CanalJsonEnhancedSerializationSchema that = (CanalJsonEnhancedSerializationSchema) o;
+ return Objects.equals(jsonSerializer, that.jsonSerializer);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(jsonSerializer);
+ }
+
+ private StringData rowKind2String(RowKind rowKind) {
+ switch (rowKind) {
+ case INSERT:
+ case UPDATE_AFTER:
+ return OP_INSERT;
+ case UPDATE_BEFORE:
+ case DELETE:
+ return OP_DELETE;
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported operation '" + rowKind + "' for row kind.");
+ }
+ }
+
+ private static RowType createJsonRowType(DataType physicalDataType, List<WriteableMetadata> writeableMetadata) {
+ // Canal JSON contains other information, e.g. "database", "ts"
+ // but we don't need them
+ // and we don't need "old" , because can not support UPDATE_BEFORE,UPDATE_AFTER
+ DataType root =
+ DataTypes.ROW(
+ DataTypes.FIELD("data", DataTypes.ARRAY(physicalDataType)),
+ DataTypes.FIELD("type", DataTypes.STRING()));
+ // append fields that are required for reading metadata in the root
+ final List<DataTypes.Field> metadataFields =
+ writeableMetadata.stream()
+ .map(m -> m.requiredJsonField)
+ .distinct()
+ .collect(Collectors.toList());
+ return (RowType) DataTypeUtils.appendRowFields(root, metadataFields).getLogicalType();
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Converter that load a metadata field from the row that comes out of the input RowData.
+ * Finally all metadata field will splice into a GenericRowData, then json Serializer serialize it into json string.
+ */
+ interface MetadataConverter extends Serializable {
+ Object convert(RowData inputRow, int pos);
+ }
+}
diff --git a/inlong-sort/sort-formats/format-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-formats/format-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 000000000..c728ddd5c
--- /dev/null
+++ b/inlong-sort/sort-formats/format-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.formats.json.canal.CanalJsonEnhancedFormatFactory
diff --git a/inlong-sort/sort-formats/format-json/src/test/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedFormatFactoryTest.java b/inlong-sort/sort-formats/format-json/src/test/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedFormatFactoryTest.java
new file mode 100644
index 000000000..97375283e
--- /dev/null
+++ b/inlong-sort/sort-formats/format-json/src/test/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedFormatFactoryTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.json.canal;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonOptions;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.TestDynamicTableFactory;
+import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
+import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import static org.apache.flink.table.factories.utils.FactoryMocks.PHYSICAL_DATA_TYPE;
+import static org.apache.flink.table.factories.utils.FactoryMocks.PHYSICAL_TYPE;
+import static org.apache.flink.table.factories.utils.FactoryMocks.SCHEMA;
+import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
+import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
+import static org.junit.Assert.assertEquals;
+
+public class CanalJsonEnhancedFormatFactoryTest {
+ private static final InternalTypeInfo<RowData> ROW_TYPE_INFO =
+ InternalTypeInfo.of(PHYSICAL_TYPE);
+
+ @Test
+ public void testUserDefinedOptions() {
+ final Map<String, String> tableOptions =
+ getModifiedOptions(opts -> {
+ opts.put("canal-json-inlong.map-null-key.mode", "LITERAL");
+ opts.put("canal-json-inlong.map-null-key.literal", "nullKey");
+ opts.put("canal-json-inlong.ignore-parse-errors", "true");
+ opts.put("canal-json-inlong.timestamp-format.standard", "ISO-8601");
+ opts.put("canal-json-inlong.database.include", "mydb");
+ opts.put("canal-json-inlong.table.include", "mytable");
+ opts.put("canal-json-inlong.map-null-key.mode", "LITERAL");
+ opts.put("canal-json-inlong.map-null-key.literal", "nullKey");
+ opts.put("canal-json-inlong.encode.decimal-as-plain-number", "true");
+ });
+
+ // test Deser
+ CanalJsonEnhancedDeserializationSchema expectedDeser =
+ CanalJsonEnhancedDeserializationSchema.builder(
+ PHYSICAL_DATA_TYPE, Collections.emptyList(), ROW_TYPE_INFO)
+ .setIgnoreParseErrors(true)
+ .setTimestampFormat(TimestampFormat.ISO_8601)
+ .setDatabase("mydb")
+ .setTable("mytable")
+ .build();
+ DeserializationSchema<RowData> actualDeser = createDeserializationSchema(tableOptions);
+ assertEquals(expectedDeser, actualDeser);
+
+ // test Ser
+ CanalJsonEnhancedSerializationSchema expectedSer =
+ new CanalJsonEnhancedSerializationSchema(
+ PHYSICAL_DATA_TYPE,
+ new ArrayList<>(),
+ TimestampFormat.ISO_8601,
+ JsonOptions.MapNullKeyMode.LITERAL,
+ "nullKey",
+ true);
+ SerializationSchema<RowData> actualSer = createSerializationSchema(tableOptions);
+ assertEquals(expectedSer, actualSer);
+ }
+
+ // ------------------------------------------------------------------------
+ // Public Tools
+ // ------------------------------------------------------------------------
+
+ public static DeserializationSchema<RowData> createDeserializationSchema(
+ Map<String, String> options) {
+ DynamicTableSource source = createTableSource(SCHEMA, options);
+
+ assert source instanceof TestDynamicTableFactory.DynamicTableSourceMock;
+ TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock =
+ (TestDynamicTableFactory.DynamicTableSourceMock) source;
+
+ return scanSourceMock.valueFormat.createRuntimeDecoder(
+ ScanRuntimeProviderContext.INSTANCE, PHYSICAL_DATA_TYPE);
+ }
+
+ public static SerializationSchema<RowData> createSerializationSchema(
+ Map<String, String> options) {
+ DynamicTableSink sink = createTableSink(SCHEMA, options);
+
+ assert sink instanceof TestDynamicTableFactory.DynamicTableSinkMock;
+ TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
+ (TestDynamicTableFactory.DynamicTableSinkMock) sink;
+
+ return sinkMock.valueFormat.createRuntimeEncoder(
+ new SinkRuntimeProviderContext(false), PHYSICAL_DATA_TYPE);
+ }
+
+ /**
+ * Returns the full options modified by the given consumer {@code optionModifier}.
+ *
+ * @param optionModifier Consumer to modify the options
+ */
+ public static Map<String, String> getModifiedOptions(Consumer<Map<String, String>> optionModifier) {
+ Map<String, String> options = getAllOptions();
+ optionModifier.accept(options);
+ return options;
+ }
+
+ private static Map<String, String> getAllOptions() {
+ final Map<String, String> options = new HashMap<>();
+ options.put("connector", TestDynamicTableFactory.IDENTIFIER);
+ options.put("target", "MyTarget");
+ options.put("buffer-size", "1000");
+ options.put("format", "canal-json-inlong");
+ return options;
+ }
+}
diff --git a/inlong-sort/sort-formats/format-json/src/test/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedSerDeSchemaTest.java b/inlong-sort/sort-formats/format-json/src/test/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedSerDeSchemaTest.java
new file mode 100644
index 000000000..e04bd46f9
--- /dev/null
+++ b/inlong-sort/sort-formats/format-json/src/test/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedSerDeSchemaTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.json.canal;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonOptions;
+import org.apache.flink.formats.json.canal.CanalJsonOptions;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.util.Collector;
+import org.apache.inlong.sort.formats.json.canal.CanalJsonEnhancedDecodingFormat.ReadableMetadata;
+import org.apache.inlong.sort.formats.json.canal.CanalJsonEnhancedEncodingFormat.WriteableMetadata;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.assertEquals;
+
+public class CanalJsonEnhancedSerDeSchemaTest {
+ public static final String DATABASE = "TEST";
+
+ public static final String TABLE = "TEST";
+
+ public static final ResolvedSchema SCHEMA =
+ ResolvedSchema.of(
+ Column.metadata("database", DataTypes.BOOLEAN(), "database", false),
+ Column.physical("id", DataTypes.BIGINT()),
+ Column.physical("name", DataTypes.STRING()),
+ Column.metadata("table", DataTypes.BOOLEAN(), "table", false),
+ Column.metadata("sql_type",
+ DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()), "sql-type", false),
+ Column.metadata("pk_names",
+ DataTypes.ARRAY(DataTypes.STRING()), "pk-names", false),
+ Column.metadata("ingestion_timestamp",
+ DataTypes.TIMESTAMP_LTZ(3), "ingestion-timestamp", false),
+ Column.metadata("event_timestamp",
+ DataTypes.TIMESTAMP_LTZ(3), "event-timestamp", false),
+ Column.metadata("op_type", DataTypes.STRING(), "op-type", false),
+ Column.metadata("is_ddl", DataTypes.BOOLEAN(), "is-ddl", false),
+ Column.metadata("mysql_type",
+ DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()), "mysql-type", false),
+ Column.metadata("batch_id", DataTypes.BIGINT(), "batch-id", false),
+ Column.metadata("update_before",
+ DataTypes.ARRAY(DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
+ "update-before", false));
+
+ public static final DataType PHYSICAL_DATA_TYPE = SCHEMA.toPhysicalRowDataType();
+
+ public static final List<ReadableMetadata> READABLE_METADATA =
+ Stream.of(
+ ReadableMetadata.DATABASE,
+ ReadableMetadata.TABLE,
+ ReadableMetadata.SQL_TYPE,
+ ReadableMetadata.PK_NAMES,
+ ReadableMetadata.INGESTION_TIMESTAMP,
+ ReadableMetadata.EVENT_TIMESTAMP,
+ ReadableMetadata.OP_TYPE,
+ ReadableMetadata.IS_DDL,
+ ReadableMetadata.MYSQL_TYPE,
+ ReadableMetadata.BATCH_ID,
+ ReadableMetadata.UPDATE_BEFORE
+ ).collect(Collectors.toList());
+
+ public static final List<WriteableMetadata> WRITEABLE_METADATA =
+ Stream.of(
+ WriteableMetadata.DATABASE,
+ WriteableMetadata.TABLE,
+ WriteableMetadata.SQL_TYPE,
+ WriteableMetadata.PK_NAMES,
+ WriteableMetadata.INGESTION_TIMESTAMP,
+ WriteableMetadata.EVENT_TIMESTAMP,
+ WriteableMetadata.OP_TYPE,
+ WriteableMetadata.IS_DDL,
+ WriteableMetadata.MYSQL_TYPE,
+ WriteableMetadata.BATCH_ID,
+ WriteableMetadata.UPDATE_BEFORE
+ ).collect(Collectors.toList());
+
+ @Test
+ public void testSerDeWithMetadata() throws Exception {
+ List<String> lines = readLines("canal-json-inlong-data.txt");
+ DeserializationSchema<RowData> deserializationSchema = createCanalJsonDeserializationSchema(
+ PHYSICAL_DATA_TYPE, READABLE_METADATA);
+ // deserialize
+ SimpleCollector out = new SimpleCollector();
+ for (String line : lines) {
+ deserializationSchema.deserialize(line.getBytes(StandardCharsets.UTF_8), out);
+ }
+ List<RowData> res = out.result();
+
+ // serialize
+ SerializationSchema<RowData> serializationSchema = createCanalJsonSerializationSchema(
+ PHYSICAL_DATA_TYPE, WRITEABLE_METADATA);
+ serializationSchema.open(null);
+ for (int i = 0; i < lines.size(); i++) {
+ String json = new String(serializationSchema.serialize(res.get(i)), StandardCharsets.UTF_8);
+ compareJson(json, lines.get(i));
+ }
+ }
+
+ // =======================================Utils=======================================================
+
+ private CanalJsonEnhancedDeserializationSchema createCanalJsonDeserializationSchema(
+ DataType physicalDataType, List<ReadableMetadata> requestedMetadata) {
+ final DataType producedDataType =
+ DataTypeUtils.appendRowFields(
+ physicalDataType,
+ requestedMetadata.stream()
+ .map(m -> DataTypes.FIELD(m.key, m.dataType))
+ .collect(Collectors.toList()));
+ return CanalJsonEnhancedDeserializationSchema.builder(
+ PHYSICAL_DATA_TYPE,
+ requestedMetadata,
+ InternalTypeInfo.of(producedDataType.getLogicalType()))
+ .setDatabase(DATABASE)
+ .setTable(TABLE)
+ .setIgnoreParseErrors(JsonOptions.IGNORE_PARSE_ERRORS.defaultValue())
+ .setTimestampFormat(TimestampFormat.valueOf(CanalJsonOptions.TIMESTAMP_FORMAT.defaultValue()))
+ .build();
+ }
+
+ private CanalJsonEnhancedSerializationSchema createCanalJsonSerializationSchema(
+ DataType physicalDataType, List<WriteableMetadata> requestedMetadata) {
+ return new CanalJsonEnhancedSerializationSchema(
+ physicalDataType,
+ requestedMetadata,
+ TimestampFormat.valueOf(CanalJsonOptions.TIMESTAMP_FORMAT.defaultValue()),
+ JsonOptions.MapNullKeyMode.valueOf(CanalJsonOptions.JSON_MAP_NULL_KEY_MODE.defaultValue()),
+ CanalJsonOptions.JSON_MAP_NULL_KEY_LITERAL.defaultValue(),
+ JsonOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER.defaultValue());
+ }
+
+ private static List<String> readLines(String resource) throws IOException {
+ final URL url = CanalJsonEnhancedSerDeSchemaTest.class.getClassLoader().getResource(resource);
+ assert url != null;
+ Path path = new File(url.getFile()).toPath();
+ return Files.readAllLines(path);
+ }
+
+ private static List<RowData> readRowDatas(String resource) throws IOException, ClassNotFoundException {
+ final URL url = CanalJsonEnhancedSerDeSchemaTest.class.getClassLoader().getResource(resource);
+ assert url != null;
+ Path path = new File(url.getFile()).toPath();
+ ObjectInputStream in = new ObjectInputStream(new FileInputStream(path.toString()));
+ return (List<RowData>)in.readObject();
+ }
+
+ public void compareJson(String json1, String json2) throws JsonProcessingException {
+ ObjectMapper objectMapper = new ObjectMapper();
+ JsonNode node1 = objectMapper.readTree(json1);
+ JsonNode node2 = objectMapper.readTree(json2);
+ assertEquals(node1, node2);
+ }
+
+ private static class SimpleCollector implements Collector<RowData> {
+
+ private List<RowData> list = new ArrayList<>();
+
+ @Override
+ public void collect(RowData record) {
+ list.add(record);
+ }
+
+ @Override
+ public void close() {
+ // do nothing
+ }
+
+ public List<RowData> result() {
+ List<RowData> newList = new ArrayList<>();
+ list.forEach(row -> newList.add(row));
+ list.clear();
+ return newList;
+ }
+ }
+
+}
diff --git a/inlong-sort/sort-formats/format-json/src/test/resources/canal-json-inlong-data.txt b/inlong-sort/sort-formats/format-json/src/test/resources/canal-json-inlong-data.txt
new file mode 100644
index 000000000..0181fa17b
--- /dev/null
+++ b/inlong-sort/sort-formats/format-json/src/test/resources/canal-json-inlong-data.txt
@@ -0,0 +1,3 @@
+{"data":[{"id":2,"name":"xixi"}],"type":"INSERT","pkNames":["id"],"database":"TEST","mysqlType":{"name":"VARCHAR(63)","id":"BIGINT(20)"},"opType":"INSERT","es":0,"batchId":0,"sqlType":{"name":12,"id":-5},"updateBefore":null,"ts":1651208731718,"isDdl":false,"table":"TEST"}
+{"data":[{"id":1,"name":"oooooo"}],"type":"INSERT","pkNames":["id"],"database":"TEST","mysqlType":{"name":"VARCHAR(63)","id":"BIGINT(20)"},"opType":"INSERT","es":0,"batchId":1,"sqlType":{"name":12,"id":-5},"updateBefore":null,"ts":1651208731717,"isDdl":false,"table":"TEST"}
+{"data":[{"id":3,"name":"HAHA"}],"type":"INSERT","pkNames":["id"],"database":"TEST","mysqlType":{"name":"VARCHAR(63)","id":"BIGINT(20)"},"opType":"INSERT","es":1651208797000,"batchId":2,"sqlType":{"name":12,"id":-5},"updateBefore":null,"ts":1651208799752,"isDdl":false,"table":"TEST"}
\ No newline at end of file
diff --git a/inlong-sort/sort-formats/format-json/src/test/resources/log4j-test.properties b/inlong-sort/sort-formats/format-json/src/test/resources/log4j-test.properties
new file mode 100644
index 000000000..a4640f721
--- /dev/null
+++ b/inlong-sort/sort-formats/format-json/src/test/resources/log4j-test.properties
@@ -0,0 +1,29 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=INFO, testlogger
+# testlogger is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target=System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-4r [%t] %-5p %c %x - %m%n
+# Suppress the irrelevant (wrong) warnings from the Netty channel handler
+log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
+# Resource leak detector only works with logging enabled at error level
+log4j.logger.org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector=ERROR, testlogger
diff --git a/inlong-sort/sort-formats/pom.xml b/inlong-sort/sort-formats/pom.xml
index 648bb9c84..d95ff8439 100644
--- a/inlong-sort/sort-formats/pom.xml
+++ b/inlong-sort/sort-formats/pom.xml
@@ -130,6 +130,20 @@
<scope>test</scope>
</dependency>
+ <!--flink dependency-->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${flink.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
diff --git a/inlong-sort/sort-single-tenant/pom.xml b/inlong-sort/sort-single-tenant/pom.xml
index 726fed249..ec69e894c 100644
--- a/inlong-sort/sort-single-tenant/pom.xml
+++ b/inlong-sort/sort-single-tenant/pom.xml
@@ -123,11 +123,6 @@
</dependency>
<!--flink-->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka_${flink.scala.binary.version}</artifactId>
- </dependency>
-
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_${flink.scala.binary.version}</artifactId>
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
index 7f3d5b23e..948f08868 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
@@ -63,7 +63,7 @@ import java.io.IOException;
import java.util.Map;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
-import static org.apache.inlong.sort.singletenant.flink.kafka.KafkaSinkBuilder.buildKafkaSink;
+import static org.apache.inlong.sort.flink.kafka.KafkaSinkBuilder.buildKafkaSink;
import static org.apache.inlong.sort.singletenant.flink.pulsar.PulsarSourceBuilder.buildPulsarSource;
import static org.apache.inlong.sort.singletenant.flink.pulsar.PulsarSourceBuilder.buildTDMQPulsarSource;
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/parser/impl/FlinkSqlParser.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/parser/impl/FlinkSqlParser.java
index c8294ebd9..e481b09ff 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/parser/impl/FlinkSqlParser.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/parser/impl/FlinkSqlParser.java
@@ -663,35 +663,33 @@ public class FlinkSqlParser implements Parser {
case MYSQL_METADATA_DATABASE:
metaType = "STRING METADATA FROM 'value.database'";
break;
- case MYSQL_METADATA_EVENT_TIME:
- metaType = "TIMESTAMP(3) METADATA FROM 'value.op_ts'";
+ case METADATA_SQL_TYPE:
+ metaType = "MAP<STRING, INT> METADATA FROM 'value.sql-type'";
break;
- case MYSQL_METADATA_EVENT_TYPE:
- metaType = "STRING METADATA FROM 'value.op_type'";
+ case METADATA_PK_NAMES:
+ metaType = "ARRAY<STRING> METADATA FROM 'value.pk-names'";
break;
- case MYSQL_METADATA_DATA:
- metaType = "STRING METADATA FROM 'value.data'";
+ case METADATA_TS:
+ metaType = "TIMESTAMP_LTZ(3) METADATA FROM 'value.ingestion-timestamp'";
break;
- case MYSQL_METADATA_IS_DDL:
- metaType = "BOOLEAN METADATA FROM 'value.is_ddl'";
+ case MYSQL_METADATA_EVENT_TIME:
+ metaType = "TIMESTAMP_LTZ(3) METADATA FROM 'value.event-timestamp'";
break;
- case METADATA_TS:
- metaType = "TIMESTAMP_LTZ(3) METADATA FROM 'value.ts'";
+ // additional metadata
+ case MYSQL_METADATA_EVENT_TYPE:
+ metaType = "STRING METADATA FROM 'value.op-type'";
break;
- case METADATA_SQL_TYPE:
- metaType = "MAP<STRING, INT> METADATA FROM 'value.sql_type'";
+ case MYSQL_METADATA_IS_DDL:
+ metaType = "BOOLEAN METADATA FROM 'value.is-ddl'";
break;
case METADATA_MYSQL_TYPE:
- metaType = "MAP<STRING, STRING> METADATA FROM 'value.mysql_type'";
- break;
- case METADATA_PK_NAMES:
- metaType = "ARRAY<STRING> METADATA FROM 'value.pk_names'";
+ metaType = "MAP<STRING, STRING> METADATA FROM 'value.mysql-type'";
break;
case METADATA_BATCH_ID:
- metaType = "BIGINT METADATA FROM 'value.batch_id'";
+ metaType = "BIGINT METADATA FROM 'value.batch-id'";
break;
case METADATA_UPDATE_BEFORE:
- metaType = "ARRAY<MAP<STRING, STRING>> METADATA FROM 'value.update_before'";
+ metaType = "ARRAY<MAP<STRING, STRING>> METADATA FROM 'value.update-before'";
break;
default:
metaType = TableFormatUtils.deriveLogicalType(metaField.getFormatInfo()).asSummaryString();
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBase.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/flink/kafka/KafkaSinkTestBase.java
similarity index 98%
rename from inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBase.java
rename to inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/flink/kafka/KafkaSinkTestBase.java
index 1b9834669..fcd0279ff 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBase.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/flink/kafka/KafkaSinkTestBase.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.singletenant.flink.kafka;
+package org.apache.inlong.sort.flink.kafka;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
@@ -64,7 +64,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
-import static org.apache.inlong.sort.singletenant.flink.kafka.KafkaSinkBuilder.buildKafkaSink;
+import static org.apache.inlong.sort.flink.kafka.KafkaSinkBuilder.buildKafkaSink;
import static org.apache.inlong.sort.singletenant.flink.utils.NetUtils.getUnusedLocalPort;
import static org.junit.Assert.assertNull;
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToAvroKafkaSinkTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/flink/kafka/RowToAvroKafkaSinkTest.java
similarity index 99%
rename from inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToAvroKafkaSinkTest.java
rename to inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/flink/kafka/RowToAvroKafkaSinkTest.java
index 2a084170c..634cf489a 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToAvroKafkaSinkTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/flink/kafka/RowToAvroKafkaSinkTest.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.singletenant.flink.kafka;
+package org.apache.inlong.sort.flink.kafka;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToCanalKafkaSinkTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/flink/kafka/RowToCanalKafkaSinkTest.java
similarity index 98%
rename from inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToCanalKafkaSinkTest.java
rename to inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/flink/kafka/RowToCanalKafkaSinkTest.java
index ebc8e062e..6e5cc6965 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToCanalKafkaSinkTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/flink/kafka/RowToCanalKafkaSinkTest.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.singletenant.flink.kafka;
+package org.apache.inlong.sort.flink.kafka;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToDebeziumJsonKafkaSinkTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/flink/kafka/RowToDebeziumJsonKafkaSinkTest.java
similarity index 98%
rename from inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToDebeziumJsonKafkaSinkTest.java
rename to inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/flink/kafka/RowToDebeziumJsonKafkaSinkTest.java
index c587acbf6..085585d41 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToDebeziumJsonKafkaSinkTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/flink/kafka/RowToDebeziumJsonKafkaSinkTest.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.singletenant.flink.kafka;
+package org.apache.inlong.sort.flink.kafka;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToJsonKafkaSinkTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/flink/kafka/RowToJsonKafkaSinkTest.java
similarity index 98%
rename from inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToJsonKafkaSinkTest.java
rename to inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/flink/kafka/RowToJsonKafkaSinkTest.java
index 66c964b8f..4197da0a6 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToJsonKafkaSinkTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/flink/kafka/RowToJsonKafkaSinkTest.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.singletenant.flink.kafka;
+package org.apache.inlong.sort.flink.kafka;
import org.apache.flink.types.Row;
import org.apache.inlong.sort.formats.common.ArrayFormatInfo;
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToStringKafkaSinkTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/flink/kafka/RowToStringKafkaSinkTest.java
similarity index 97%
rename from inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToStringKafkaSinkTest.java
rename to inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/flink/kafka/RowToStringKafkaSinkTest.java
index 999f8b511..46374bf0b 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToStringKafkaSinkTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/flink/kafka/RowToStringKafkaSinkTest.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.singletenant.flink.kafka;
+package org.apache.inlong.sort.flink.kafka;
import org.apache.flink.types.Row;
import org.apache.inlong.sort.formats.common.DoubleFormatInfo;
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FlinkSqlParserTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FlinkSqlParserTest.java
index 14ca70205..ece7f5ce3 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FlinkSqlParserTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FlinkSqlParserTest.java
@@ -166,5 +166,4 @@ public class FlinkSqlParserTest extends AbstractTestBase {
FlinkSqlParseResult result = parser.parse();
Assert.assertTrue(result.tryExecute());
}
-
}
diff --git a/pom.xml b/pom.xml
index 722852659..d24341113 100644
--- a/pom.xml
+++ b/pom.xml
@@ -959,6 +959,12 @@
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${flink.connector.mysql.cdc.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils-junit</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
<!-- qcloud -->
<dependency>