You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ty...@apache.org on 2023/04/10 06:49:25 UTC
[incubator-seatunnel] branch dev updated: [Improve][Translation][Flink] Optimize code structure & remove redundant code (#4527)
This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new fbb8270b2 [Improve][Translation][Flink] Optimize code structure & remove redundant code (#4527)
fbb8270b2 is described below
commit fbb8270b27fb106802c3f3e1cc6eabc5cbc04c2b
Author: Tyrantlucifer <Ty...@gmail.com>
AuthorDate: Mon Apr 10 14:49:18 2023 +0800
[Improve][Translation][Flink] Optimize code structure & remove redundant code (#4527)
---
.../seatunnel-translation-flink/pom.xml | 1 +
.../seatunnel-translation-flink-13/pom.xml | 14 +-
.../translation/flink/sink/FlinkSink.java | 101 ----------
.../translation/flink/sink/FlinkSinkWriter.java | 95 ----------
.../flink/utils/TypeConverterUtils.java | 205 ---------------------
.../seatunnel-translation-flink-15/pom.xml | 8 +-
.../serialization/CommitWrapperSerializer.java | 74 --------
.../flink/serialization/FlinkRowConverter.java | 129 -------------
.../FlinkSimpleVersionedSerializer.java | 56 ------
.../serialization/FlinkWriterStateSerializer.java | 77 --------
.../flink/serialization/KryoTypeInfo.java | 41 -----
.../flink/serialization/WrappedRow.java | 54 ------
.../translation/flink/sink/CommitWrapper.java | 36 ----
.../translation/flink/sink/FlinkCommitter.java | 64 -------
.../flink/sink/FlinkGlobalCommitter.java | 80 --------
.../translation/flink/sink/FlinkWriterState.java | 54 ------
.../flink/source/BaseSeaTunnelSourceFunction.java | 180 ------------------
.../translation/flink/source/RowCollector.java | 58 ------
.../flink/source/SeaTunnelCoordinatedSource.java | 45 -----
.../flink/source/SeaTunnelParallelSource.java | 52 ------
.../pom.xml | 5 +-
.../serialization/CommitWrapperSerializer.java | 0
.../flink/serialization/FlinkRowConverter.java | 0
.../FlinkSimpleVersionedSerializer.java | 0
.../serialization/FlinkWriterStateSerializer.java | 0
.../flink/serialization/KryoTypeInfo.java | 0
.../flink/serialization/WrappedRow.java | 0
.../translation/flink/sink/CommitWrapper.java | 0
.../translation/flink/sink/FlinkCommitter.java | 0
.../flink/sink/FlinkGlobalCommitter.java | 0
.../translation/flink/sink/FlinkSink.java | 0
.../translation/flink/sink/FlinkSinkWriter.java | 0
.../translation/flink/sink/FlinkWriterState.java | 0
.../flink/source/BaseSeaTunnelSourceFunction.java | 0
.../translation/flink/source/RowCollector.java | 0
.../flink/source/SeaTunnelCoordinatedSource.java | 0
.../flink/source/SeaTunnelParallelSource.java | 0
.../flink/utils/TypeConverterUtils.java | 0
38 files changed, 24 insertions(+), 1405 deletions(-)
diff --git a/seatunnel-translation/seatunnel-translation-flink/pom.xml b/seatunnel-translation/seatunnel-translation-flink/pom.xml
index b93aa621c..8c4b28bb0 100644
--- a/seatunnel-translation/seatunnel-translation-flink/pom.xml
+++ b/seatunnel-translation/seatunnel-translation-flink/pom.xml
@@ -29,6 +29,7 @@
<modules>
<module>seatunnel-translation-flink-13</module>
<module>seatunnel-translation-flink-15</module>
+ <module>seatunnel-translation-flink-common</module>
</modules>
<dependencies>
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/pom.xml b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/pom.xml
index 2dbfd1282..b16072804 100644
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/pom.xml
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/pom.xml
@@ -23,10 +23,22 @@
</parent>
<artifactId>seatunnel-translation-flink-13</artifactId>
- <name>SeaTunnel : Translation : Flink : 1.3</name>
+ <name>SeaTunnel : Translation : Flink : 1.13</name>
<dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-translation-flink-common</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
deleted file mode 100644
index 31f286938..000000000
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.flink.sink;
-
-import org.apache.seatunnel.api.sink.DefaultSinkWriterContext;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.translation.flink.serialization.CommitWrapperSerializer;
-import org.apache.seatunnel.translation.flink.serialization.FlinkSimpleVersionedSerializer;
-import org.apache.seatunnel.translation.flink.serialization.FlinkWriterStateSerializer;
-
-import org.apache.flink.api.connector.sink.Committer;
-import org.apache.flink.api.connector.sink.GlobalCommitter;
-import org.apache.flink.api.connector.sink.Sink;
-import org.apache.flink.api.connector.sink.SinkWriter;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-/**
- * The sink implementation of {@link Sink}, the entrypoint of flink sink translation
- *
- * @param <InputT> The generic type of input data
- * @param <CommT> The generic type of commit message
- * @param <WriterStateT> The generic type of writer state
- * @param <GlobalCommT> The generic type of global commit message
- */
-public class FlinkSink<InputT, CommT, WriterStateT, GlobalCommT>
- implements Sink<InputT, CommitWrapper<CommT>, FlinkWriterState<WriterStateT>, GlobalCommT> {
-
- private final SeaTunnelSink<SeaTunnelRow, WriterStateT, CommT, GlobalCommT> sink;
-
- public FlinkSink(SeaTunnelSink<SeaTunnelRow, WriterStateT, CommT, GlobalCommT> sink) {
- this.sink = sink;
- }
-
- @Override
- public SinkWriter<InputT, CommitWrapper<CommT>, FlinkWriterState<WriterStateT>> createWriter(
- org.apache.flink.api.connector.sink.Sink.InitContext context,
- List<FlinkWriterState<WriterStateT>> states)
- throws IOException {
- org.apache.seatunnel.api.sink.SinkWriter.Context stContext =
- new DefaultSinkWriterContext(context.getSubtaskId());
-
- if (states == null || states.isEmpty()) {
- return new FlinkSinkWriter<>(sink.createWriter(stContext), 1, sink.getConsumedType());
- } else {
- List<WriterStateT> restoredState =
- states.stream().map(FlinkWriterState::getState).collect(Collectors.toList());
- return new FlinkSinkWriter<>(
- sink.restoreWriter(stContext, restoredState),
- states.get(0).getCheckpointId(),
- sink.getConsumedType());
- }
- }
-
- @Override
- public Optional<Committer<CommitWrapper<CommT>>> createCommitter() throws IOException {
- return sink.createCommitter().map(FlinkCommitter::new);
- }
-
- @Override
- public Optional<GlobalCommitter<CommitWrapper<CommT>, GlobalCommT>> createGlobalCommitter()
- throws IOException {
- return sink.createAggregatedCommitter().map(FlinkGlobalCommitter::new);
- }
-
- @Override
- public Optional<SimpleVersionedSerializer<CommitWrapper<CommT>>> getCommittableSerializer() {
- return sink.getCommitInfoSerializer().map(CommitWrapperSerializer::new);
- }
-
- @Override
- public Optional<SimpleVersionedSerializer<GlobalCommT>> getGlobalCommittableSerializer() {
- return sink.getAggregatedCommitInfoSerializer().map(FlinkSimpleVersionedSerializer::new);
- }
-
- @Override
- public Optional<SimpleVersionedSerializer<FlinkWriterState<WriterStateT>>>
- getWriterStateSerializer() {
- return sink.getWriterStateSerializer().map(FlinkWriterStateSerializer::new);
- }
-}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
deleted file mode 100644
index 2a6928101..000000000
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.flink.sink;
-
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.translation.flink.serialization.FlinkRowConverter;
-
-import org.apache.flink.api.connector.sink.Sink;
-import org.apache.flink.api.connector.sink.SinkWriter;
-import org.apache.flink.types.Row;
-
-import java.io.IOException;
-import java.io.InvalidClassException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-/**
- * The sink writer implementation of {@link SinkWriter}, which is created by {@link
- * Sink#createWriter}
- *
- * @param <InputT> The generic type of input data
- * @param <CommT> The generic type of commit message
- * @param <WriterStateT> The generic type of writer state
- */
-public class FlinkSinkWriter<InputT, CommT, WriterStateT>
- implements SinkWriter<InputT, CommitWrapper<CommT>, FlinkWriterState<WriterStateT>> {
-
- private final org.apache.seatunnel.api.sink.SinkWriter<SeaTunnelRow, CommT, WriterStateT>
- sinkWriter;
- private final FlinkRowConverter rowSerialization;
- private long checkpointId;
-
- FlinkSinkWriter(
- org.apache.seatunnel.api.sink.SinkWriter<SeaTunnelRow, CommT, WriterStateT> sinkWriter,
- long checkpointId,
- SeaTunnelDataType<?> dataType) {
- this.sinkWriter = sinkWriter;
- this.checkpointId = checkpointId;
- this.rowSerialization = new FlinkRowConverter(dataType);
- }
-
- @Override
- public void write(
- InputT element, org.apache.flink.api.connector.sink.SinkWriter.Context context)
- throws IOException {
- if (element instanceof Row) {
- sinkWriter.write(rowSerialization.reconvert((Row) element));
- } else {
- throw new InvalidClassException(
- "only support Flink Row at now, the element Class is " + element.getClass());
- }
- }
-
- @Override
- public List<CommitWrapper<CommT>> prepareCommit(boolean flush) throws IOException {
- Optional<CommT> commTOptional = sinkWriter.prepareCommit();
- return commTOptional
- .map(CommitWrapper::new)
- .map(Collections::singletonList)
- .orElse(Collections.emptyList());
- }
-
- @Override
- public List<FlinkWriterState<WriterStateT>> snapshotState() throws IOException {
- List<FlinkWriterState<WriterStateT>> states =
- sinkWriter.snapshotState(this.checkpointId).stream()
- .map(state -> new FlinkWriterState<>(this.checkpointId, state))
- .collect(Collectors.toList());
- this.checkpointId++;
- return states;
- }
-
- @Override
- public void close() throws Exception {
- sinkWriter.close();
- }
-}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
deleted file mode 100644
index bd0199445..000000000
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.flink.utils;
-
-import org.apache.seatunnel.api.table.type.ArrayType;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.DecimalType;
-import org.apache.seatunnel.api.table.type.LocalTimeType;
-import org.apache.seatunnel.api.table.type.MapType;
-import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-
-import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.LocalTimeTypeInfo;
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.MapTypeInfo;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.table.runtime.typeutils.BigDecimalTypeInfo;
-
-import java.math.BigDecimal;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.LocalTime;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-@SuppressWarnings("checkstyle:MagicNumber")
-public class TypeConverterUtils {
-
- private static final Map<Class<?>, BridgedType> BRIDGED_TYPES = new HashMap<>(32);
-
- static {
- // basic types
- BRIDGED_TYPES.put(
- String.class,
- BridgedType.of(BasicType.STRING_TYPE, BasicTypeInfo.STRING_TYPE_INFO));
- BRIDGED_TYPES.put(
- Boolean.class,
- BridgedType.of(BasicType.BOOLEAN_TYPE, BasicTypeInfo.BOOLEAN_TYPE_INFO));
- BRIDGED_TYPES.put(
- Byte.class, BridgedType.of(BasicType.BYTE_TYPE, BasicTypeInfo.BYTE_TYPE_INFO));
- BRIDGED_TYPES.put(
- Short.class, BridgedType.of(BasicType.SHORT_TYPE, BasicTypeInfo.SHORT_TYPE_INFO));
- BRIDGED_TYPES.put(
- Integer.class, BridgedType.of(BasicType.INT_TYPE, BasicTypeInfo.INT_TYPE_INFO));
- BRIDGED_TYPES.put(
- Long.class, BridgedType.of(BasicType.LONG_TYPE, BasicTypeInfo.LONG_TYPE_INFO));
- BRIDGED_TYPES.put(
- Float.class, BridgedType.of(BasicType.FLOAT_TYPE, BasicTypeInfo.FLOAT_TYPE_INFO));
- BRIDGED_TYPES.put(
- Double.class,
- BridgedType.of(BasicType.DOUBLE_TYPE, BasicTypeInfo.DOUBLE_TYPE_INFO));
- BRIDGED_TYPES.put(
- Void.class, BridgedType.of(BasicType.VOID_TYPE, BasicTypeInfo.VOID_TYPE_INFO));
- // TODO: there is a still an unresolved issue that the BigDecimal type will lose the
- // precision and scale
- BRIDGED_TYPES.put(
- BigDecimal.class,
- BridgedType.of(new DecimalType(38, 18), BasicTypeInfo.BIG_DEC_TYPE_INFO));
- // data time types
- BRIDGED_TYPES.put(
- LocalDate.class,
- BridgedType.of(LocalTimeType.LOCAL_DATE_TYPE, LocalTimeTypeInfo.LOCAL_DATE));
- BRIDGED_TYPES.put(
- LocalTime.class,
- BridgedType.of(LocalTimeType.LOCAL_TIME_TYPE, LocalTimeTypeInfo.LOCAL_TIME));
- BRIDGED_TYPES.put(
- LocalDateTime.class,
- BridgedType.of(
- LocalTimeType.LOCAL_DATE_TIME_TYPE, LocalTimeTypeInfo.LOCAL_DATE_TIME));
- // basic array types
- BRIDGED_TYPES.put(
- byte[].class,
- BridgedType.of(
- PrimitiveByteArrayType.INSTANCE,
- PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO));
- BRIDGED_TYPES.put(
- String[].class,
- BridgedType.of(
- ArrayType.STRING_ARRAY_TYPE, BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO));
- BRIDGED_TYPES.put(
- Boolean[].class,
- BridgedType.of(
- ArrayType.BOOLEAN_ARRAY_TYPE, BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO));
- BRIDGED_TYPES.put(
- Byte[].class,
- BridgedType.of(ArrayType.BYTE_ARRAY_TYPE, BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO));
- BRIDGED_TYPES.put(
- Short[].class,
- BridgedType.of(
- ArrayType.SHORT_ARRAY_TYPE, BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO));
- BRIDGED_TYPES.put(
- Integer[].class,
- BridgedType.of(ArrayType.INT_ARRAY_TYPE, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO));
- BRIDGED_TYPES.put(
- Long[].class,
- BridgedType.of(ArrayType.LONG_ARRAY_TYPE, BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO));
- BRIDGED_TYPES.put(
- Float[].class,
- BridgedType.of(
- ArrayType.FLOAT_ARRAY_TYPE, BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO));
- BRIDGED_TYPES.put(
- Double[].class,
- BridgedType.of(
- ArrayType.DOUBLE_ARRAY_TYPE, BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO));
- }
-
- private TypeConverterUtils() {
- throw new UnsupportedOperationException(
- "TypeConverterUtils is a utility class and cannot be instantiated");
- }
-
- public static SeaTunnelDataType<?> convert(TypeInformation<?> dataType) {
- BridgedType bridgedType = BRIDGED_TYPES.get(dataType.getTypeClass());
- if (bridgedType != null) {
- return bridgedType.getSeaTunnelType();
- }
- if (dataType instanceof BigDecimalTypeInfo) {
- BigDecimalTypeInfo decimalType = (BigDecimalTypeInfo) dataType;
- return new DecimalType(decimalType.precision(), decimalType.scale());
- }
- if (dataType instanceof MapTypeInfo) {
- MapTypeInfo<?, ?> mapTypeInfo = (MapTypeInfo<?, ?>) dataType;
- return new MapType<>(
- convert(mapTypeInfo.getKeyTypeInfo()), convert(mapTypeInfo.getValueTypeInfo()));
- }
- if (dataType instanceof RowTypeInfo) {
- RowTypeInfo typeInformation = (RowTypeInfo) dataType;
- String[] fieldNames = typeInformation.getFieldNames();
- SeaTunnelDataType<?>[] seaTunnelDataTypes =
- Arrays.stream(typeInformation.getFieldTypes())
- .map(TypeConverterUtils::convert)
- .toArray(SeaTunnelDataType[]::new);
- return new SeaTunnelRowType(fieldNames, seaTunnelDataTypes);
- }
- throw new IllegalArgumentException("Unsupported Flink's data type: " + dataType);
- }
-
- public static TypeInformation<?> convert(SeaTunnelDataType<?> dataType) {
- BridgedType bridgedType = BRIDGED_TYPES.get(dataType.getTypeClass());
- if (bridgedType != null) {
- return bridgedType.getFlinkType();
- }
- if (dataType instanceof DecimalType) {
- DecimalType decimalType = (DecimalType) dataType;
- return new BigDecimalTypeInfo(decimalType.getPrecision(), decimalType.getScale());
- }
- if (dataType instanceof MapType) {
- MapType<?, ?> mapType = (MapType<?, ?>) dataType;
- return new MapTypeInfo<>(
- convert(mapType.getKeyType()), convert(mapType.getValueType()));
- }
- if (dataType instanceof SeaTunnelRowType) {
- SeaTunnelRowType rowType = (SeaTunnelRowType) dataType;
- TypeInformation<?>[] types =
- Arrays.stream(rowType.getFieldTypes())
- .map(TypeConverterUtils::convert)
- .toArray(TypeInformation[]::new);
- return new RowTypeInfo(types, rowType.getFieldNames());
- }
- throw new IllegalArgumentException("Unsupported SeaTunnel's data type: " + dataType);
- }
-
- public static class BridgedType {
- private final SeaTunnelDataType<?> seaTunnelType;
- private final TypeInformation<?> flinkType;
-
- private BridgedType(SeaTunnelDataType<?> seaTunnelType, TypeInformation<?> flinkType) {
- this.seaTunnelType = seaTunnelType;
- this.flinkType = flinkType;
- }
-
- public static BridgedType of(
- SeaTunnelDataType<?> seaTunnelType, TypeInformation<?> flinkType) {
- return new BridgedType(seaTunnelType, flinkType);
- }
-
- public TypeInformation<?> getFlinkType() {
- return flinkType;
- }
-
- public SeaTunnelDataType<?> getSeaTunnelType() {
- return seaTunnelType;
- }
- }
-}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/pom.xml b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/pom.xml
index 03aa1ddae..50842d5a9 100644
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/pom.xml
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/pom.xml
@@ -23,10 +23,16 @@
</parent>
<artifactId>seatunnel-translation-flink-15</artifactId>
- <name>SeaTunnel : Translation : Flink : 1.5</name>
+ <name>SeaTunnel : Translation : Flink : 1.15</name>
<dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-translation-flink-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/CommitWrapperSerializer.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/CommitWrapperSerializer.java
deleted file mode 100644
index 23b3681ec..000000000
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/CommitWrapperSerializer.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.flink.serialization;
-
-import org.apache.seatunnel.api.serialization.Serializer;
-import org.apache.seatunnel.translation.flink.sink.CommitWrapper;
-
-import org.apache.flink.api.connector.sink.Sink;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-/**
- * The serializer wrapper of the commit message serializer, which is created by {@link
- * Sink#getCommittableSerializer()}, used to unify the different implementations of {@link
- * Serializer}
- *
- * @param <T> The generic type of commit message
- */
-public class CommitWrapperSerializer<T> implements SimpleVersionedSerializer<CommitWrapper<T>> {
- private final Serializer<T> serializer;
-
- public CommitWrapperSerializer(Serializer<T> serializer) {
- this.serializer = serializer;
- }
-
- @Override
- public int getVersion() {
- return 0;
- }
-
- @Override
- public byte[] serialize(CommitWrapper<T> commitWrapper) throws IOException {
- try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- final DataOutputStream out = new DataOutputStream(baos)) {
- byte[] serialize = serializer.serialize(commitWrapper.getCommit());
- out.writeInt(serialize.length);
- out.write(serialize);
- out.flush();
- return baos.toByteArray();
- }
- }
-
- @Override
- public CommitWrapper<T> deserialize(int version, byte[] serialized) throws IOException {
- try (final ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
- final DataInputStream in = new DataInputStream(bais)) {
- final int size = in.readInt();
- final byte[] stateBytes = new byte[size];
- in.read(stateBytes);
- T commitT = serializer.deserialize(stateBytes);
- return new CommitWrapper<>(commitT);
- }
- }
-}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java
deleted file mode 100644
index fa8d88e05..000000000
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.flink.serialization;
-
-import org.apache.seatunnel.api.table.type.MapType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.api.table.type.SqlType;
-import org.apache.seatunnel.translation.serialization.RowConverter;
-
-import org.apache.flink.types.Row;
-import org.apache.flink.types.RowKind;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.function.BiFunction;
-
-/**
- * The row converter between {@link Row} and {@link SeaTunnelRow}, used to convert or reconvert
- * between flink row and seatunnel row
- */
-public class FlinkRowConverter extends RowConverter<Row> {
-
- public FlinkRowConverter(SeaTunnelDataType<?> dataType) {
- super(dataType);
- }
-
- @Override
- public Row convert(SeaTunnelRow seaTunnelRow) throws IOException {
- validate(seaTunnelRow);
- return (Row) convert(seaTunnelRow, dataType);
- }
-
- private static Object convert(Object field, SeaTunnelDataType<?> dataType) {
- if (field == null) {
- return null;
- }
- SqlType sqlType = dataType.getSqlType();
- switch (sqlType) {
- case ROW:
- SeaTunnelRow seaTunnelRow = (SeaTunnelRow) field;
- SeaTunnelRowType rowType = (SeaTunnelRowType) dataType;
- int arity = rowType.getTotalFields();
- Row engineRow = new Row(arity);
- for (int i = 0; i < arity; i++) {
- engineRow.setField(
- i, convert(seaTunnelRow.getField(i), rowType.getFieldType(i)));
- }
- engineRow.setKind(RowKind.fromByteValue(seaTunnelRow.getRowKind().toByteValue()));
- return engineRow;
- case MAP:
- return convertMap(
- (Map<?, ?>) field, (MapType<?, ?>) dataType, FlinkRowConverter::convert);
- default:
- return field;
- }
- }
-
- private static Object convertMap(
- Map<?, ?> mapData,
- MapType<?, ?> mapType,
- BiFunction<Object, SeaTunnelDataType<?>, Object> convertFunction) {
- if (mapData == null || mapData.size() == 0) {
- return mapData;
- }
- switch (mapType.getValueType().getSqlType()) {
- case MAP:
- case ROW:
- Map<Object, Object> newMap = new HashMap<>(mapData.size());
- mapData.forEach(
- (key, value) -> {
- SeaTunnelDataType<?> valueType = mapType.getValueType();
- newMap.put(key, convertFunction.apply(value, valueType));
- });
- return newMap;
- default:
- return mapData;
- }
- }
-
- @Override
- public SeaTunnelRow reconvert(Row engineRow) throws IOException {
- return (SeaTunnelRow) reconvert(engineRow, dataType);
- }
-
- private static Object reconvert(Object field, SeaTunnelDataType<?> dataType) {
- if (field == null) {
- return null;
- }
- SqlType sqlType = dataType.getSqlType();
- switch (sqlType) {
- case ROW:
- Row engineRow = (Row) field;
- SeaTunnelRowType rowType = (SeaTunnelRowType) dataType;
- int arity = rowType.getTotalFields();
- SeaTunnelRow seaTunnelRow = new SeaTunnelRow(arity);
- for (int i = 0; i < arity; i++) {
- seaTunnelRow.setField(
- i, reconvert(engineRow.getField(i), rowType.getFieldType(i)));
- }
- seaTunnelRow.setRowKind(
- org.apache.seatunnel.api.table.type.RowKind.fromByteValue(
- engineRow.getKind().toByteValue()));
- return seaTunnelRow;
- case MAP:
- return convertMap(
- (Map<?, ?>) field, (MapType<?, ?>) dataType, FlinkRowConverter::reconvert);
- default:
- return field;
- }
- }
-}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkSimpleVersionedSerializer.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkSimpleVersionedSerializer.java
deleted file mode 100644
index e5f054761..000000000
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkSimpleVersionedSerializer.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.flink.serialization;
-
-import org.apache.seatunnel.api.serialization.Serializer;
-
-import org.apache.flink.api.connector.sink.Sink;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-
-import java.io.IOException;
-
-/**
- * The serializer wrapper of aggregate commit message serializer, which is created by {@link
- * Sink#getGlobalCommittableSerializer()}, used to unify the different implementations of {@link
- * Serializer}
- *
- * @param <T> The generic type of aggregate commit message
- */
-public class FlinkSimpleVersionedSerializer<T> implements SimpleVersionedSerializer<T> {
-
- private final Serializer<T> serializer;
-
- public FlinkSimpleVersionedSerializer(Serializer<T> serializer) {
- this.serializer = serializer;
- }
-
- @Override
- public int getVersion() {
- return 0;
- }
-
- @Override
- public byte[] serialize(T obj) throws IOException {
- return serializer.serialize(obj);
- }
-
- @Override
- public T deserialize(int version, byte[] serialized) throws IOException {
- return serializer.deserialize(serialized);
- }
-}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkWriterStateSerializer.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkWriterStateSerializer.java
deleted file mode 100644
index ce938ed0c..000000000
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkWriterStateSerializer.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.flink.serialization;
-
-import org.apache.seatunnel.api.serialization.Serializer;
-import org.apache.seatunnel.translation.flink.sink.FlinkWriterState;
-
-import org.apache.flink.api.connector.sink.Sink;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-/**
- * The serializer wrapper of writer state serializer, which is created by {@link
- * Sink#getWriterStateSerializer()}, used to unify the different implementations of {@link
- * Serializer}
- *
- * @param <T> The generic type of writer state
- */
-public class FlinkWriterStateSerializer<T>
- implements SimpleVersionedSerializer<FlinkWriterState<T>> {
- private final Serializer<T> serializer;
-
- public FlinkWriterStateSerializer(Serializer<T> serializer) {
- this.serializer = serializer;
- }
-
- @Override
- public int getVersion() {
- return 0;
- }
-
- @Override
- public byte[] serialize(FlinkWriterState<T> state) throws IOException {
- try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- final DataOutputStream out = new DataOutputStream(baos)) {
- out.writeLong(state.getCheckpointId());
- byte[] serialize = serializer.serialize(state.getState());
- out.writeInt(serialize.length);
- out.write(serialize);
- out.flush();
- return baos.toByteArray();
- }
- }
-
- @Override
- public FlinkWriterState<T> deserialize(int version, byte[] serialized) throws IOException {
- try (final ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
- final DataInputStream in = new DataInputStream(bais)) {
- final long checkpointId = in.readLong();
- final int size = in.readInt();
- final byte[] stateBytes = new byte[size];
- in.read(stateBytes);
- T stateT = serializer.deserialize(stateBytes);
- return new FlinkWriterState<>(checkpointId, stateT);
- }
- }
-}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/KryoTypeInfo.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/KryoTypeInfo.java
deleted file mode 100644
index 20530f241..000000000
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/KryoTypeInfo.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.flink.serialization;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
-
-/**
- * Operator chaining can avoid serialization and deserialization while the job is running. However,
- * the Flink Job initializes to determine if it is a generic type. Disabling serialization of
- * generic types causes an exception, even though the operator does not serialize at run time.
- */
-public class KryoTypeInfo<T> extends GenericTypeInfo<T> {
- private static final long serialVersionUID = -4367528355992922603L;
-
- public KryoTypeInfo(Class<T> typeClass) {
- super(typeClass);
- }
-
- @Override
- public TypeSerializer<T> createSerializer(ExecutionConfig config) {
- return new KryoSerializer<T>(getTypeClass(), config);
- }
-}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/WrappedRow.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/WrappedRow.java
deleted file mode 100644
index c00648580..000000000
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/WrappedRow.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.flink.serialization;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.types.Row;
-
-/**
- * Wrapped {@link Row}.
- *
- * <p>Keep the original table name for the Dispatcher to distribute to the corresponding data stream
- */
-public class WrappedRow extends Tuple2<Row, String> {
- private static final long serialVersionUID = -8325988931728734377L;
-
- public WrappedRow() {
- super();
- }
-
- public WrappedRow(Row row, String table) {
- super(row, table);
- }
-
- public Row getRow() {
- return this.f0;
- }
-
- public String getTable() {
- return this.f1;
- }
-
- public void setRow(Row row) {
- this.f0 = row;
- }
-
- public void setTable(String table) {
- this.f1 = table;
- }
-}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/CommitWrapper.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/CommitWrapper.java
deleted file mode 100644
index c89496143..000000000
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/CommitWrapper.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.flink.sink;
-
-/**
- * The commit message wrapper, which is used to wrapper the different commit messages and unify the
- * different implementations of {@link CommitT}
- *
- * @param <CommitT> The generic type of commit message
- */
-public class CommitWrapper<CommitT> {
- private final CommitT commit;
-
- public CommitWrapper(CommitT commit) {
- this.commit = commit;
- }
-
- public CommitT getCommit() {
- return commit;
- }
-}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitter.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitter.java
deleted file mode 100644
index 3bbb2633a..000000000
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitter.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.flink.sink;
-
-import org.apache.seatunnel.api.sink.SinkCommitter;
-
-import org.apache.flink.api.connector.sink.Committer;
-import org.apache.flink.api.connector.sink.Sink;
-
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.stream.Collectors;
-
-/**
- * The committer wrapper of {@link SinkCommitter}, which is created by {@link
- * Sink#createCommitter()}, used to unify the different sink committer implementations
- *
- * @param <CommT> The generic type of commit message
- */
-@Slf4j
-public class FlinkCommitter<CommT> implements Committer<CommitWrapper<CommT>> {
-
- private final SinkCommitter<CommT> sinkCommitter;
-
- FlinkCommitter(SinkCommitter<CommT> sinkCommitter) {
- this.sinkCommitter = sinkCommitter;
- }
-
- @Override
- public List<CommitWrapper<CommT>> commit(List<CommitWrapper<CommT>> committables)
- throws IOException {
- List<CommT> reCommittable =
- sinkCommitter.commit(
- committables.stream()
- .map(CommitWrapper::getCommit)
- .collect(Collectors.toList()));
- if (reCommittable != null && !reCommittable.isEmpty()) {
- log.warn("this version not support re-commit when use flink engine");
- }
- // TODO re-commit the data
- return new ArrayList<>();
- }
-
- @Override
- public void close() throws Exception {}
-}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java
deleted file mode 100644
index 7c95735e1..000000000
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.flink.sink;
-
-import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
-
-import org.apache.flink.api.connector.sink.GlobalCommitter;
-import org.apache.flink.api.connector.sink.Sink;
-
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.stream.Collectors;
-
-/**
- * The committer wrapper of {@link SinkAggregatedCommitter}, which is created by {@link
- * Sink#createGlobalCommitter()}, used to unify the different implementations of {@link
- * SinkAggregatedCommitter}
- *
- * @param <CommT> The generic type of commit message type
- * @param <GlobalCommT> The generic type of global commit message type
- */
-@Slf4j
-public class FlinkGlobalCommitter<CommT, GlobalCommT>
- implements GlobalCommitter<CommitWrapper<CommT>, GlobalCommT> {
-
- private final SinkAggregatedCommitter<CommT, GlobalCommT> aggregatedCommitter;
-
- FlinkGlobalCommitter(SinkAggregatedCommitter<CommT, GlobalCommT> aggregatedCommitter) {
- this.aggregatedCommitter = aggregatedCommitter;
- }
-
- @Override
- public List<GlobalCommT> filterRecoveredCommittables(List globalCommittables)
- throws IOException {
- return Collections.emptyList();
- }
-
- @Override
- public GlobalCommT combine(List<CommitWrapper<CommT>> committables) throws IOException {
- return aggregatedCommitter.combine(
- committables.stream().map(CommitWrapper::getCommit).collect(Collectors.toList()));
- }
-
- @Override
- public List<GlobalCommT> commit(List<GlobalCommT> globalCommittables) throws IOException {
- List<GlobalCommT> reCommittable = aggregatedCommitter.commit(globalCommittables);
- if (reCommittable != null && !reCommittable.isEmpty()) {
- log.warn("this version not support re-commit when use flink engine");
- }
- // TODO re-commit the data
- return new ArrayList<>();
- }
-
- @Override
- public void endOfInput() throws IOException {}
-
- @Override
- public void close() throws Exception {
- aggregatedCommitter.close();
- }
-}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkWriterState.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkWriterState.java
deleted file mode 100644
index 05fe17144..000000000
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkWriterState.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.flink.sink;
-
-import java.io.Serializable;
-
-/**
- * The writer state wrapper of {@link StateT}, used to unify the different implementations of {@link
- * StateT}
- *
- * @param <StateT> The generic type of the writer state
- */
-public class FlinkWriterState<StateT> implements Serializable {
-
- private long checkpointId = 0;
-
- private StateT state;
-
- public FlinkWriterState(long checkpointId, StateT state) {
- this.checkpointId = checkpointId;
- this.state = state;
- }
-
- public long getCheckpointId() {
- return checkpointId;
- }
-
- public void setCheckpointId(long checkpointId) {
- this.checkpointId = checkpointId;
- }
-
- public StateT getState() {
- return state;
- }
-
- public void setState(StateT state) {
- this.state = state;
- }
-}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
deleted file mode 100644
index f0c6fc2ae..000000000
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.flink.source;
-
-import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
-import org.apache.seatunnel.translation.source.BaseSourceFunction;
-
-import org.apache.flink.api.common.state.CheckpointListener;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.types.Row;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * The abstract implementation of {@link RichSourceFunction}, the entrypoint of flink source
- * translation
- */
-public abstract class BaseSeaTunnelSourceFunction extends RichSourceFunction<Row>
- implements CheckpointListener, ResultTypeQueryable<Row>, CheckpointedFunction {
- private static final Logger LOG = LoggerFactory.getLogger(BaseSeaTunnelSourceFunction.class);
-
- protected final SeaTunnelSource<SeaTunnelRow, ?, ?> source;
- protected transient volatile BaseSourceFunction<SeaTunnelRow> internalSource;
-
- protected transient ListState<Map<Integer, List<byte[]>>> sourceState;
- protected transient volatile Map<Integer, List<byte[]>> restoredState;
-
- protected final AtomicLong latestCompletedCheckpointId = new AtomicLong(0);
- protected final AtomicLong latestTriggerCheckpointId = new AtomicLong(0);
-
- /** Flag indicating whether the consumer is still running. */
- private volatile boolean running = true;
-
- public BaseSeaTunnelSourceFunction(SeaTunnelSource<SeaTunnelRow, ?, ?> source) {
- this.source = source;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- this.internalSource = createInternalSource();
- this.internalSource.open();
- }
-
- protected abstract BaseSourceFunction<SeaTunnelRow> createInternalSource();
-
- @SuppressWarnings("checkstyle:MagicNumber")
- @Override
- public void run(SourceFunction.SourceContext<Row> sourceContext) throws Exception {
- internalSource.run(
- new RowCollector(
- sourceContext,
- sourceContext.getCheckpointLock(),
- source.getProducedType()));
- // Wait for a checkpoint to complete:
- // In the current version(version < 1.14.0), when the operator state of the source changes
- // to FINISHED, jobs cannot be checkpoint executed.
- final long prevCheckpointId = latestTriggerCheckpointId.get();
- // Ensured Checkpoint enabled
- if (getRuntimeContext() instanceof StreamingRuntimeContext
- && ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled()) {
- while (running && prevCheckpointId >= latestCompletedCheckpointId.get()) {
- Thread.sleep(100);
- }
- }
- }
-
- @Override
- public void close() throws Exception {
- cancel();
- LOG.debug("Close the SeaTunnelSourceFunction of Flink.");
- }
-
- @Override
- public void cancel() {
- running = false;
- try {
- if (internalSource != null) {
- LOG.debug("Cancel the SeaTunnelSourceFunction of Flink.");
- internalSource.close();
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void notifyCheckpointComplete(long checkpointId) throws Exception {
- internalSource.notifyCheckpointComplete(checkpointId);
- latestCompletedCheckpointId.set(checkpointId);
- }
-
- @Override
- public void notifyCheckpointAborted(long checkpointId) throws Exception {
- internalSource.notifyCheckpointAborted(checkpointId);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public TypeInformation<Row> getProducedType() {
- return (TypeInformation<Row>) TypeConverterUtils.convert(source.getProducedType());
- }
-
- @Override
- public void snapshotState(FunctionSnapshotContext snapshotContext) throws Exception {
- final long checkpointId = snapshotContext.getCheckpointId();
- latestTriggerCheckpointId.set(checkpointId);
- if (!running) {
- LOG.debug("snapshotState() called on closed source");
- } else {
- sourceState.clear();
- sourceState.add(internalSource.snapshotState(checkpointId));
- }
- }
-
- @Override
- public void initializeState(FunctionInitializationContext initializeContext) throws Exception {
- this.restoredState = new HashMap<>();
- this.sourceState =
- initializeContext
- .getOperatorStateStore()
- .getListState(
- new ListStateDescriptor<>(
- getStateName(),
- Types.MAP(
- BasicTypeInfo.INT_TYPE_INFO,
- Types.LIST(
- PrimitiveArrayTypeInfo
- .BYTE_PRIMITIVE_ARRAY_TYPE_INFO))));
- if (initializeContext.isRestored()) {
- // populate actual holder for restored state
- sourceState.get().forEach(map -> restoredState.putAll(map));
- LOG.info(
- "Consumer subtask {} restored state",
- getRuntimeContext().getIndexOfThisSubtask());
- } else {
- LOG.info(
- "Consumer subtask {} has no restore state.",
- getRuntimeContext().getIndexOfThisSubtask());
- }
- }
-
- protected abstract String getStateName();
-}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/source/RowCollector.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/source/RowCollector.java
deleted file mode 100644
index 63b70242f..000000000
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/source/RowCollector.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.flink.source;
-
-import org.apache.seatunnel.api.source.Collector;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.translation.flink.serialization.FlinkRowConverter;
-
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.types.Row;
-
-import java.io.IOException;
-
-public class RowCollector implements Collector<SeaTunnelRow> {
-
- protected final SourceFunction.SourceContext<Row> internalCollector;
- protected final FlinkRowConverter rowSerialization;
- protected final Object checkpointLock;
-
- public RowCollector(
- SourceFunction.SourceContext<Row> internalCollector,
- Object checkpointLock,
- SeaTunnelDataType<?> dataType) {
- this.internalCollector = internalCollector;
- this.checkpointLock = checkpointLock;
- this.rowSerialization = new FlinkRowConverter(dataType);
- }
-
- @Override
- public void collect(SeaTunnelRow record) {
- try {
- internalCollector.collect(rowSerialization.convert(record));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public Object getCheckpointLock() {
- return this.checkpointLock;
- }
-}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelCoordinatedSource.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelCoordinatedSource.java
deleted file mode 100644
index 0d03dbf78..000000000
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelCoordinatedSource.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.flink.source;
-
-import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.translation.source.BaseSourceFunction;
-import org.apache.seatunnel.translation.source.CoordinatedSource;
-
-/** The coordinated source function implementation of {@link BaseSeaTunnelSourceFunction} */
-public class SeaTunnelCoordinatedSource extends BaseSeaTunnelSourceFunction {
-
- protected static final String COORDINATED_SOURCE_STATE_NAME = "coordinated-source-states";
-
- public SeaTunnelCoordinatedSource(SeaTunnelSource<SeaTunnelRow, ?, ?> source) {
- // TODO: Make sure the source is coordinated.
- super(source);
- }
-
- @Override
- protected BaseSourceFunction<SeaTunnelRow> createInternalSource() {
- return new CoordinatedSource<>(
- source, restoredState, getRuntimeContext().getNumberOfParallelSubtasks());
- }
-
- @Override
- protected String getStateName() {
- return COORDINATED_SOURCE_STATE_NAME;
- }
-}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
deleted file mode 100644
index 6e1515d56..000000000
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.flink.source;
-
-import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.translation.source.BaseSourceFunction;
-import org.apache.seatunnel.translation.source.ParallelSource;
-
-import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
-import org.apache.flink.types.Row;
-
-/** The parallel source function implementation of {@link BaseSeaTunnelSourceFunction} */
-public class SeaTunnelParallelSource extends BaseSeaTunnelSourceFunction
- implements ParallelSourceFunction<Row> {
-
- protected static final String PARALLEL_SOURCE_STATE_NAME = "parallel-source-states";
-
- public SeaTunnelParallelSource(SeaTunnelSource<SeaTunnelRow, ?, ?> source) {
- // TODO: Make sure the source is uncoordinated.
- super(source);
- }
-
- @Override
- protected BaseSourceFunction<SeaTunnelRow> createInternalSource() {
- return new ParallelSource<>(
- source,
- restoredState,
- getRuntimeContext().getNumberOfParallelSubtasks(),
- getRuntimeContext().getIndexOfThisSubtask());
- }
-
- @Override
- protected String getStateName() {
- return PARALLEL_SOURCE_STATE_NAME;
- }
-}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/pom.xml b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/pom.xml
similarity index 93%
copy from seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/pom.xml
copy to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/pom.xml
index 03aa1ddae..de0f3702b 100644
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/pom.xml
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/pom.xml
@@ -22,8 +22,9 @@
<version>${revision}</version>
</parent>
- <artifactId>seatunnel-translation-flink-15</artifactId>
- <name>SeaTunnel : Translation : Flink : 1.5</name>
+ <artifactId>seatunnel-translation-flink-common</artifactId>
+ <packaging>jar</packaging>
+ <name>SeaTunnel : Translation : Flink : Common</name>
<dependencies>
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/serialization/CommitWrapperSerializer.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/CommitWrapperSerializer.java
similarity index 100%
rename from seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/serialization/CommitWrapperSerializer.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/CommitWrapperSerializer.java
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java
similarity index 100%
rename from seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkSimpleVersionedSerializer.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkSimpleVersionedSerializer.java
similarity index 100%
rename from seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkSimpleVersionedSerializer.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkSimpleVersionedSerializer.java
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkWriterStateSerializer.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkWriterStateSerializer.java
similarity index 100%
rename from seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkWriterStateSerializer.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkWriterStateSerializer.java
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/serialization/KryoTypeInfo.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/KryoTypeInfo.java
similarity index 100%
rename from seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/serialization/KryoTypeInfo.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/KryoTypeInfo.java
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/serialization/WrappedRow.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/WrappedRow.java
similarity index 100%
rename from seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/serialization/WrappedRow.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/WrappedRow.java
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/CommitWrapper.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/CommitWrapper.java
similarity index 100%
rename from seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/CommitWrapper.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/CommitWrapper.java
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitter.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitter.java
similarity index 100%
rename from seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitter.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitter.java
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java
similarity index 100%
rename from seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
similarity index 100%
rename from seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
similarity index 100%
rename from seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkWriterState.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkWriterState.java
similarity index 100%
rename from seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkWriterState.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkWriterState.java
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
similarity index 100%
rename from seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/source/RowCollector.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/RowCollector.java
similarity index 100%
rename from seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/source/RowCollector.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/RowCollector.java
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelCoordinatedSource.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelCoordinatedSource.java
similarity index 100%
rename from seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelCoordinatedSource.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelCoordinatedSource.java
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
similarity index 100%
rename from seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
similarity index 100%
rename from seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java