You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ty...@apache.org on 2023/04/10 06:49:25 UTC

[incubator-seatunnel] branch dev updated: [Improve][Translation][Flink] Optimize code structure & remove redundant code (#4527)

This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new fbb8270b2 [Improve][Translation][Flink] Optimize code structure & remove redundant code (#4527)
fbb8270b2 is described below

commit fbb8270b27fb106802c3f3e1cc6eabc5cbc04c2b
Author: Tyrantlucifer <Ty...@gmail.com>
AuthorDate: Mon Apr 10 14:49:18 2023 +0800

    [Improve][Translation][Flink] Optimize code structure & remove redundant code (#4527)
---
 .../seatunnel-translation-flink/pom.xml            |   1 +
 .../seatunnel-translation-flink-13/pom.xml         |  14 +-
 .../translation/flink/sink/FlinkSink.java          | 101 ----------
 .../translation/flink/sink/FlinkSinkWriter.java    |  95 ----------
 .../flink/utils/TypeConverterUtils.java            | 205 ---------------------
 .../seatunnel-translation-flink-15/pom.xml         |   8 +-
 .../serialization/CommitWrapperSerializer.java     |  74 --------
 .../flink/serialization/FlinkRowConverter.java     | 129 -------------
 .../FlinkSimpleVersionedSerializer.java            |  56 ------
 .../serialization/FlinkWriterStateSerializer.java  |  77 --------
 .../flink/serialization/KryoTypeInfo.java          |  41 -----
 .../flink/serialization/WrappedRow.java            |  54 ------
 .../translation/flink/sink/CommitWrapper.java      |  36 ----
 .../translation/flink/sink/FlinkCommitter.java     |  64 -------
 .../flink/sink/FlinkGlobalCommitter.java           |  80 --------
 .../translation/flink/sink/FlinkWriterState.java   |  54 ------
 .../flink/source/BaseSeaTunnelSourceFunction.java  | 180 ------------------
 .../translation/flink/source/RowCollector.java     |  58 ------
 .../flink/source/SeaTunnelCoordinatedSource.java   |  45 -----
 .../flink/source/SeaTunnelParallelSource.java      |  52 ------
 .../pom.xml                                        |   5 +-
 .../serialization/CommitWrapperSerializer.java     |   0
 .../flink/serialization/FlinkRowConverter.java     |   0
 .../FlinkSimpleVersionedSerializer.java            |   0
 .../serialization/FlinkWriterStateSerializer.java  |   0
 .../flink/serialization/KryoTypeInfo.java          |   0
 .../flink/serialization/WrappedRow.java            |   0
 .../translation/flink/sink/CommitWrapper.java      |   0
 .../translation/flink/sink/FlinkCommitter.java     |   0
 .../flink/sink/FlinkGlobalCommitter.java           |   0
 .../translation/flink/sink/FlinkSink.java          |   0
 .../translation/flink/sink/FlinkSinkWriter.java    |   0
 .../translation/flink/sink/FlinkWriterState.java   |   0
 .../flink/source/BaseSeaTunnelSourceFunction.java  |   0
 .../translation/flink/source/RowCollector.java     |   0
 .../flink/source/SeaTunnelCoordinatedSource.java   |   0
 .../flink/source/SeaTunnelParallelSource.java      |   0
 .../flink/utils/TypeConverterUtils.java            |   0
 38 files changed, 24 insertions(+), 1405 deletions(-)

diff --git a/seatunnel-translation/seatunnel-translation-flink/pom.xml b/seatunnel-translation/seatunnel-translation-flink/pom.xml
index b93aa621c..8c4b28bb0 100644
--- a/seatunnel-translation/seatunnel-translation-flink/pom.xml
+++ b/seatunnel-translation/seatunnel-translation-flink/pom.xml
@@ -29,6 +29,7 @@
     <modules>
         <module>seatunnel-translation-flink-13</module>
         <module>seatunnel-translation-flink-15</module>
+        <module>seatunnel-translation-flink-common</module>
     </modules>
 
     <dependencies>
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/pom.xml b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/pom.xml
index 2dbfd1282..b16072804 100644
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/pom.xml
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/pom.xml
@@ -23,10 +23,22 @@
     </parent>
 
     <artifactId>seatunnel-translation-flink-13</artifactId>
-    <name>SeaTunnel : Translation : Flink : 1.3</name>
+    <name>SeaTunnel : Translation : Flink : 1.13</name>
 
     <dependencies>
 
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-translation-flink-common</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>*</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
deleted file mode 100644
index 31f286938..000000000
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.flink.sink;
-
-import org.apache.seatunnel.api.sink.DefaultSinkWriterContext;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.translation.flink.serialization.CommitWrapperSerializer;
-import org.apache.seatunnel.translation.flink.serialization.FlinkSimpleVersionedSerializer;
-import org.apache.seatunnel.translation.flink.serialization.FlinkWriterStateSerializer;
-
-import org.apache.flink.api.connector.sink.Committer;
-import org.apache.flink.api.connector.sink.GlobalCommitter;
-import org.apache.flink.api.connector.sink.Sink;
-import org.apache.flink.api.connector.sink.SinkWriter;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-/**
- * The sink implementation of {@link Sink}, the entrypoint of flink sink translation
- *
- * @param <InputT> The generic type of input data
- * @param <CommT> The generic type of commit message
- * @param <WriterStateT> The generic type of writer state
- * @param <GlobalCommT> The generic type of global commit message
- */
-public class FlinkSink<InputT, CommT, WriterStateT, GlobalCommT>
-        implements Sink<InputT, CommitWrapper<CommT>, FlinkWriterState<WriterStateT>, GlobalCommT> {
-
-    private final SeaTunnelSink<SeaTunnelRow, WriterStateT, CommT, GlobalCommT> sink;
-
-    public FlinkSink(SeaTunnelSink<SeaTunnelRow, WriterStateT, CommT, GlobalCommT> sink) {
-        this.sink = sink;
-    }
-
-    @Override
-    public SinkWriter<InputT, CommitWrapper<CommT>, FlinkWriterState<WriterStateT>> createWriter(
-            org.apache.flink.api.connector.sink.Sink.InitContext context,
-            List<FlinkWriterState<WriterStateT>> states)
-            throws IOException {
-        org.apache.seatunnel.api.sink.SinkWriter.Context stContext =
-                new DefaultSinkWriterContext(context.getSubtaskId());
-
-        if (states == null || states.isEmpty()) {
-            return new FlinkSinkWriter<>(sink.createWriter(stContext), 1, sink.getConsumedType());
-        } else {
-            List<WriterStateT> restoredState =
-                    states.stream().map(FlinkWriterState::getState).collect(Collectors.toList());
-            return new FlinkSinkWriter<>(
-                    sink.restoreWriter(stContext, restoredState),
-                    states.get(0).getCheckpointId(),
-                    sink.getConsumedType());
-        }
-    }
-
-    @Override
-    public Optional<Committer<CommitWrapper<CommT>>> createCommitter() throws IOException {
-        return sink.createCommitter().map(FlinkCommitter::new);
-    }
-
-    @Override
-    public Optional<GlobalCommitter<CommitWrapper<CommT>, GlobalCommT>> createGlobalCommitter()
-            throws IOException {
-        return sink.createAggregatedCommitter().map(FlinkGlobalCommitter::new);
-    }
-
-    @Override
-    public Optional<SimpleVersionedSerializer<CommitWrapper<CommT>>> getCommittableSerializer() {
-        return sink.getCommitInfoSerializer().map(CommitWrapperSerializer::new);
-    }
-
-    @Override
-    public Optional<SimpleVersionedSerializer<GlobalCommT>> getGlobalCommittableSerializer() {
-        return sink.getAggregatedCommitInfoSerializer().map(FlinkSimpleVersionedSerializer::new);
-    }
-
-    @Override
-    public Optional<SimpleVersionedSerializer<FlinkWriterState<WriterStateT>>>
-            getWriterStateSerializer() {
-        return sink.getWriterStateSerializer().map(FlinkWriterStateSerializer::new);
-    }
-}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
deleted file mode 100644
index 2a6928101..000000000
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.flink.sink;
-
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.translation.flink.serialization.FlinkRowConverter;
-
-import org.apache.flink.api.connector.sink.Sink;
-import org.apache.flink.api.connector.sink.SinkWriter;
-import org.apache.flink.types.Row;
-
-import java.io.IOException;
-import java.io.InvalidClassException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-/**
- * The sink writer implementation of {@link SinkWriter}, which is created by {@link
- * Sink#createWriter}
- *
- * @param <InputT> The generic type of input data
- * @param <CommT> The generic type of commit message
- * @param <WriterStateT> The generic type of writer state
- */
-public class FlinkSinkWriter<InputT, CommT, WriterStateT>
-        implements SinkWriter<InputT, CommitWrapper<CommT>, FlinkWriterState<WriterStateT>> {
-
-    private final org.apache.seatunnel.api.sink.SinkWriter<SeaTunnelRow, CommT, WriterStateT>
-            sinkWriter;
-    private final FlinkRowConverter rowSerialization;
-    private long checkpointId;
-
-    FlinkSinkWriter(
-            org.apache.seatunnel.api.sink.SinkWriter<SeaTunnelRow, CommT, WriterStateT> sinkWriter,
-            long checkpointId,
-            SeaTunnelDataType<?> dataType) {
-        this.sinkWriter = sinkWriter;
-        this.checkpointId = checkpointId;
-        this.rowSerialization = new FlinkRowConverter(dataType);
-    }
-
-    @Override
-    public void write(
-            InputT element, org.apache.flink.api.connector.sink.SinkWriter.Context context)
-            throws IOException {
-        if (element instanceof Row) {
-            sinkWriter.write(rowSerialization.reconvert((Row) element));
-        } else {
-            throw new InvalidClassException(
-                    "only support Flink Row at now, the element Class is " + element.getClass());
-        }
-    }
-
-    @Override
-    public List<CommitWrapper<CommT>> prepareCommit(boolean flush) throws IOException {
-        Optional<CommT> commTOptional = sinkWriter.prepareCommit();
-        return commTOptional
-                .map(CommitWrapper::new)
-                .map(Collections::singletonList)
-                .orElse(Collections.emptyList());
-    }
-
-    @Override
-    public List<FlinkWriterState<WriterStateT>> snapshotState() throws IOException {
-        List<FlinkWriterState<WriterStateT>> states =
-                sinkWriter.snapshotState(this.checkpointId).stream()
-                        .map(state -> new FlinkWriterState<>(this.checkpointId, state))
-                        .collect(Collectors.toList());
-        this.checkpointId++;
-        return states;
-    }
-
-    @Override
-    public void close() throws Exception {
-        sinkWriter.close();
-    }
-}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
deleted file mode 100644
index bd0199445..000000000
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.flink.utils;
-
-import org.apache.seatunnel.api.table.type.ArrayType;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.DecimalType;
-import org.apache.seatunnel.api.table.type.LocalTimeType;
-import org.apache.seatunnel.api.table.type.MapType;
-import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-
-import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.LocalTimeTypeInfo;
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.MapTypeInfo;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.table.runtime.typeutils.BigDecimalTypeInfo;
-
-import java.math.BigDecimal;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.LocalTime;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-@SuppressWarnings("checkstyle:MagicNumber")
-public class TypeConverterUtils {
-
-    private static final Map<Class<?>, BridgedType> BRIDGED_TYPES = new HashMap<>(32);
-
-    static {
-        // basic types
-        BRIDGED_TYPES.put(
-                String.class,
-                BridgedType.of(BasicType.STRING_TYPE, BasicTypeInfo.STRING_TYPE_INFO));
-        BRIDGED_TYPES.put(
-                Boolean.class,
-                BridgedType.of(BasicType.BOOLEAN_TYPE, BasicTypeInfo.BOOLEAN_TYPE_INFO));
-        BRIDGED_TYPES.put(
-                Byte.class, BridgedType.of(BasicType.BYTE_TYPE, BasicTypeInfo.BYTE_TYPE_INFO));
-        BRIDGED_TYPES.put(
-                Short.class, BridgedType.of(BasicType.SHORT_TYPE, BasicTypeInfo.SHORT_TYPE_INFO));
-        BRIDGED_TYPES.put(
-                Integer.class, BridgedType.of(BasicType.INT_TYPE, BasicTypeInfo.INT_TYPE_INFO));
-        BRIDGED_TYPES.put(
-                Long.class, BridgedType.of(BasicType.LONG_TYPE, BasicTypeInfo.LONG_TYPE_INFO));
-        BRIDGED_TYPES.put(
-                Float.class, BridgedType.of(BasicType.FLOAT_TYPE, BasicTypeInfo.FLOAT_TYPE_INFO));
-        BRIDGED_TYPES.put(
-                Double.class,
-                BridgedType.of(BasicType.DOUBLE_TYPE, BasicTypeInfo.DOUBLE_TYPE_INFO));
-        BRIDGED_TYPES.put(
-                Void.class, BridgedType.of(BasicType.VOID_TYPE, BasicTypeInfo.VOID_TYPE_INFO));
-        // TODO: there is a still an unresolved issue that the BigDecimal type will lose the
-        // precision and scale
-        BRIDGED_TYPES.put(
-                BigDecimal.class,
-                BridgedType.of(new DecimalType(38, 18), BasicTypeInfo.BIG_DEC_TYPE_INFO));
-        // data time types
-        BRIDGED_TYPES.put(
-                LocalDate.class,
-                BridgedType.of(LocalTimeType.LOCAL_DATE_TYPE, LocalTimeTypeInfo.LOCAL_DATE));
-        BRIDGED_TYPES.put(
-                LocalTime.class,
-                BridgedType.of(LocalTimeType.LOCAL_TIME_TYPE, LocalTimeTypeInfo.LOCAL_TIME));
-        BRIDGED_TYPES.put(
-                LocalDateTime.class,
-                BridgedType.of(
-                        LocalTimeType.LOCAL_DATE_TIME_TYPE, LocalTimeTypeInfo.LOCAL_DATE_TIME));
-        // basic array types
-        BRIDGED_TYPES.put(
-                byte[].class,
-                BridgedType.of(
-                        PrimitiveByteArrayType.INSTANCE,
-                        PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO));
-        BRIDGED_TYPES.put(
-                String[].class,
-                BridgedType.of(
-                        ArrayType.STRING_ARRAY_TYPE, BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO));
-        BRIDGED_TYPES.put(
-                Boolean[].class,
-                BridgedType.of(
-                        ArrayType.BOOLEAN_ARRAY_TYPE, BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO));
-        BRIDGED_TYPES.put(
-                Byte[].class,
-                BridgedType.of(ArrayType.BYTE_ARRAY_TYPE, BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO));
-        BRIDGED_TYPES.put(
-                Short[].class,
-                BridgedType.of(
-                        ArrayType.SHORT_ARRAY_TYPE, BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO));
-        BRIDGED_TYPES.put(
-                Integer[].class,
-                BridgedType.of(ArrayType.INT_ARRAY_TYPE, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO));
-        BRIDGED_TYPES.put(
-                Long[].class,
-                BridgedType.of(ArrayType.LONG_ARRAY_TYPE, BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO));
-        BRIDGED_TYPES.put(
-                Float[].class,
-                BridgedType.of(
-                        ArrayType.FLOAT_ARRAY_TYPE, BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO));
-        BRIDGED_TYPES.put(
-                Double[].class,
-                BridgedType.of(
-                        ArrayType.DOUBLE_ARRAY_TYPE, BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO));
-    }
-
-    private TypeConverterUtils() {
-        throw new UnsupportedOperationException(
-                "TypeConverterUtils is a utility class and cannot be instantiated");
-    }
-
-    public static SeaTunnelDataType<?> convert(TypeInformation<?> dataType) {
-        BridgedType bridgedType = BRIDGED_TYPES.get(dataType.getTypeClass());
-        if (bridgedType != null) {
-            return bridgedType.getSeaTunnelType();
-        }
-        if (dataType instanceof BigDecimalTypeInfo) {
-            BigDecimalTypeInfo decimalType = (BigDecimalTypeInfo) dataType;
-            return new DecimalType(decimalType.precision(), decimalType.scale());
-        }
-        if (dataType instanceof MapTypeInfo) {
-            MapTypeInfo<?, ?> mapTypeInfo = (MapTypeInfo<?, ?>) dataType;
-            return new MapType<>(
-                    convert(mapTypeInfo.getKeyTypeInfo()), convert(mapTypeInfo.getValueTypeInfo()));
-        }
-        if (dataType instanceof RowTypeInfo) {
-            RowTypeInfo typeInformation = (RowTypeInfo) dataType;
-            String[] fieldNames = typeInformation.getFieldNames();
-            SeaTunnelDataType<?>[] seaTunnelDataTypes =
-                    Arrays.stream(typeInformation.getFieldTypes())
-                            .map(TypeConverterUtils::convert)
-                            .toArray(SeaTunnelDataType[]::new);
-            return new SeaTunnelRowType(fieldNames, seaTunnelDataTypes);
-        }
-        throw new IllegalArgumentException("Unsupported Flink's data type: " + dataType);
-    }
-
-    public static TypeInformation<?> convert(SeaTunnelDataType<?> dataType) {
-        BridgedType bridgedType = BRIDGED_TYPES.get(dataType.getTypeClass());
-        if (bridgedType != null) {
-            return bridgedType.getFlinkType();
-        }
-        if (dataType instanceof DecimalType) {
-            DecimalType decimalType = (DecimalType) dataType;
-            return new BigDecimalTypeInfo(decimalType.getPrecision(), decimalType.getScale());
-        }
-        if (dataType instanceof MapType) {
-            MapType<?, ?> mapType = (MapType<?, ?>) dataType;
-            return new MapTypeInfo<>(
-                    convert(mapType.getKeyType()), convert(mapType.getValueType()));
-        }
-        if (dataType instanceof SeaTunnelRowType) {
-            SeaTunnelRowType rowType = (SeaTunnelRowType) dataType;
-            TypeInformation<?>[] types =
-                    Arrays.stream(rowType.getFieldTypes())
-                            .map(TypeConverterUtils::convert)
-                            .toArray(TypeInformation[]::new);
-            return new RowTypeInfo(types, rowType.getFieldNames());
-        }
-        throw new IllegalArgumentException("Unsupported SeaTunnel's data type: " + dataType);
-    }
-
-    public static class BridgedType {
-        private final SeaTunnelDataType<?> seaTunnelType;
-        private final TypeInformation<?> flinkType;
-
-        private BridgedType(SeaTunnelDataType<?> seaTunnelType, TypeInformation<?> flinkType) {
-            this.seaTunnelType = seaTunnelType;
-            this.flinkType = flinkType;
-        }
-
-        public static BridgedType of(
-                SeaTunnelDataType<?> seaTunnelType, TypeInformation<?> flinkType) {
-            return new BridgedType(seaTunnelType, flinkType);
-        }
-
-        public TypeInformation<?> getFlinkType() {
-            return flinkType;
-        }
-
-        public SeaTunnelDataType<?> getSeaTunnelType() {
-            return seaTunnelType;
-        }
-    }
-}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/pom.xml b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/pom.xml
index 03aa1ddae..50842d5a9 100644
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/pom.xml
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/pom.xml
@@ -23,10 +23,16 @@
     </parent>
 
     <artifactId>seatunnel-translation-flink-15</artifactId>
-    <name>SeaTunnel : Translation : Flink : 1.5</name>
+    <name>SeaTunnel : Translation : Flink : 1.15</name>
 
     <dependencies>
 
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-translation-flink-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-java</artifactId>
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/CommitWrapperSerializer.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/CommitWrapperSerializer.java
deleted file mode 100644
index 23b3681ec..000000000
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/CommitWrapperSerializer.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.flink.serialization;
-
-import org.apache.seatunnel.api.serialization.Serializer;
-import org.apache.seatunnel.translation.flink.sink.CommitWrapper;
-
-import org.apache.flink.api.connector.sink.Sink;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-/**
- * The serializer wrapper of the commit message serializer, which is created by {@link
- * Sink#getCommittableSerializer()}, used to unify the different implementations of {@link
- * Serializer}
- *
- * @param <T> The generic type of commit message
- */
-public class CommitWrapperSerializer<T> implements SimpleVersionedSerializer<CommitWrapper<T>> {
-    private final Serializer<T> serializer;
-
-    public CommitWrapperSerializer(Serializer<T> serializer) {
-        this.serializer = serializer;
-    }
-
-    @Override
-    public int getVersion() {
-        return 0;
-    }
-
-    @Override
-    public byte[] serialize(CommitWrapper<T> commitWrapper) throws IOException {
-        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-                final DataOutputStream out = new DataOutputStream(baos)) {
-            byte[] serialize = serializer.serialize(commitWrapper.getCommit());
-            out.writeInt(serialize.length);
-            out.write(serialize);
-            out.flush();
-            return baos.toByteArray();
-        }
-    }
-
-    @Override
-    public CommitWrapper<T> deserialize(int version, byte[] serialized) throws IOException {
-        try (final ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
-                final DataInputStream in = new DataInputStream(bais)) {
-            final int size = in.readInt();
-            final byte[] stateBytes = new byte[size];
-            in.read(stateBytes);
-            T commitT = serializer.deserialize(stateBytes);
-            return new CommitWrapper<>(commitT);
-        }
-    }
-}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java
deleted file mode 100644
index fa8d88e05..000000000
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.flink.serialization;
-
-import org.apache.seatunnel.api.table.type.MapType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.api.table.type.SqlType;
-import org.apache.seatunnel.translation.serialization.RowConverter;
-
-import org.apache.flink.types.Row;
-import org.apache.flink.types.RowKind;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.function.BiFunction;
-
-/**
- * The row converter between {@link Row} and {@link SeaTunnelRow}, used to convert or reconvert
- * between flink row and seatunnel row
- */
-public class FlinkRowConverter extends RowConverter<Row> {
-
-    public FlinkRowConverter(SeaTunnelDataType<?> dataType) {
-        super(dataType);
-    }
-
-    @Override
-    public Row convert(SeaTunnelRow seaTunnelRow) throws IOException {
-        validate(seaTunnelRow);
-        return (Row) convert(seaTunnelRow, dataType);
-    }
-
-    private static Object convert(Object field, SeaTunnelDataType<?> dataType) {
-        if (field == null) {
-            return null;
-        }
-        SqlType sqlType = dataType.getSqlType();
-        switch (sqlType) {
-            case ROW:
-                SeaTunnelRow seaTunnelRow = (SeaTunnelRow) field;
-                SeaTunnelRowType rowType = (SeaTunnelRowType) dataType;
-                int arity = rowType.getTotalFields();
-                Row engineRow = new Row(arity);
-                for (int i = 0; i < arity; i++) {
-                    engineRow.setField(
-                            i, convert(seaTunnelRow.getField(i), rowType.getFieldType(i)));
-                }
-                engineRow.setKind(RowKind.fromByteValue(seaTunnelRow.getRowKind().toByteValue()));
-                return engineRow;
-            case MAP:
-                return convertMap(
-                        (Map<?, ?>) field, (MapType<?, ?>) dataType, FlinkRowConverter::convert);
-            default:
-                return field;
-        }
-    }
-
-    private static Object convertMap(
-            Map<?, ?> mapData,
-            MapType<?, ?> mapType,
-            BiFunction<Object, SeaTunnelDataType<?>, Object> convertFunction) {
-        if (mapData == null || mapData.size() == 0) {
-            return mapData;
-        }
-        switch (mapType.getValueType().getSqlType()) {
-            case MAP:
-            case ROW:
-                Map<Object, Object> newMap = new HashMap<>(mapData.size());
-                mapData.forEach(
-                        (key, value) -> {
-                            SeaTunnelDataType<?> valueType = mapType.getValueType();
-                            newMap.put(key, convertFunction.apply(value, valueType));
-                        });
-                return newMap;
-            default:
-                return mapData;
-        }
-    }
-
-    @Override
-    public SeaTunnelRow reconvert(Row engineRow) throws IOException {
-        return (SeaTunnelRow) reconvert(engineRow, dataType);
-    }
-
-    private static Object reconvert(Object field, SeaTunnelDataType<?> dataType) {
-        if (field == null) {
-            return null;
-        }
-        SqlType sqlType = dataType.getSqlType();
-        switch (sqlType) {
-            case ROW:
-                Row engineRow = (Row) field;
-                SeaTunnelRowType rowType = (SeaTunnelRowType) dataType;
-                int arity = rowType.getTotalFields();
-                SeaTunnelRow seaTunnelRow = new SeaTunnelRow(arity);
-                for (int i = 0; i < arity; i++) {
-                    seaTunnelRow.setField(
-                            i, reconvert(engineRow.getField(i), rowType.getFieldType(i)));
-                }
-                seaTunnelRow.setRowKind(
-                        org.apache.seatunnel.api.table.type.RowKind.fromByteValue(
-                                engineRow.getKind().toByteValue()));
-                return seaTunnelRow;
-            case MAP:
-                return convertMap(
-                        (Map<?, ?>) field, (MapType<?, ?>) dataType, FlinkRowConverter::reconvert);
-            default:
-                return field;
-        }
-    }
-}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkSimpleVersionedSerializer.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkSimpleVersionedSerializer.java
deleted file mode 100644
index e5f054761..000000000
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkSimpleVersionedSerializer.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.flink.serialization;
-
-import org.apache.seatunnel.api.serialization.Serializer;
-
-import org.apache.flink.api.connector.sink.Sink;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-
-import java.io.IOException;
-
-/**
- * The serializer wrapper of aggregate commit message serializer, which is created by {@link
- * Sink#getGlobalCommittableSerializer()}, used to unify the different implementations of {@link
- * Serializer}
- *
- * @param <T> The generic type of aggregate commit message
- */
-public class FlinkSimpleVersionedSerializer<T> implements SimpleVersionedSerializer<T> {
-
-    private final Serializer<T> serializer;
-
-    public FlinkSimpleVersionedSerializer(Serializer<T> serializer) {
-        this.serializer = serializer;
-    }
-
-    @Override
-    public int getVersion() {
-        return 0;
-    }
-
-    @Override
-    public byte[] serialize(T obj) throws IOException {
-        return serializer.serialize(obj);
-    }
-
-    @Override
-    public T deserialize(int version, byte[] serialized) throws IOException {
-        return serializer.deserialize(serialized);
-    }
-}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkWriterStateSerializer.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkWriterStateSerializer.java
deleted file mode 100644
index ce938ed0c..000000000
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkWriterStateSerializer.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.flink.serialization;
-
-import org.apache.seatunnel.api.serialization.Serializer;
-import org.apache.seatunnel.translation.flink.sink.FlinkWriterState;
-
-import org.apache.flink.api.connector.sink.Sink;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-/**
- * The serializer wrapper of writer state serializer, which is created by {@link
- * Sink#getWriterStateSerializer()}, used to unify the different implementations of {@link
- * Serializer}
- *
- * @param <T> The generic type of writer state
- */
-public class FlinkWriterStateSerializer<T>
-        implements SimpleVersionedSerializer<FlinkWriterState<T>> {
-    private final Serializer<T> serializer;
-
-    public FlinkWriterStateSerializer(Serializer<T> serializer) {
-        this.serializer = serializer;
-    }
-
-    @Override
-    public int getVersion() {
-        return 0;
-    }
-
-    @Override
-    public byte[] serialize(FlinkWriterState<T> state) throws IOException {
-        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-                final DataOutputStream out = new DataOutputStream(baos)) {
-            out.writeLong(state.getCheckpointId());
-            byte[] serialize = serializer.serialize(state.getState());
-            out.writeInt(serialize.length);
-            out.write(serialize);
-            out.flush();
-            return baos.toByteArray();
-        }
-    }
-
-    @Override
-    public FlinkWriterState<T> deserialize(int version, byte[] serialized) throws IOException {
-        try (final ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
-                final DataInputStream in = new DataInputStream(bais)) {
-            final long checkpointId = in.readLong();
-            final int size = in.readInt();
-            final byte[] stateBytes = new byte[size];
-            in.read(stateBytes);
-            T stateT = serializer.deserialize(stateBytes);
-            return new FlinkWriterState<>(checkpointId, stateT);
-        }
-    }
-}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/KryoTypeInfo.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/KryoTypeInfo.java
deleted file mode 100644
index 20530f241..000000000
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/KryoTypeInfo.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.flink.serialization;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
-
-/**
- * Operator chaining can avoid serialization and deserialization while the job is running. However,
- * the Flink Job initializes to determine if it is a generic type. Disabling serialization of
- * generic types causes an exception, even though the operator does not serialize at run time.
- */
-public class KryoTypeInfo<T> extends GenericTypeInfo<T> {
-    private static final long serialVersionUID = -4367528355992922603L;
-
-    public KryoTypeInfo(Class<T> typeClass) {
-        super(typeClass);
-    }
-
-    @Override
-    public TypeSerializer<T> createSerializer(ExecutionConfig config) {
-        return new KryoSerializer<T>(getTypeClass(), config);
-    }
-}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/WrappedRow.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/WrappedRow.java
deleted file mode 100644
index c00648580..000000000
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/serialization/WrappedRow.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.flink.serialization;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.types.Row;
-
-/**
- * Wrapped {@link Row}.
- *
- * <p>Keep the original table name for the Dispatcher to distribute to the corresponding data stream
- */
-public class WrappedRow extends Tuple2<Row, String> {
-    private static final long serialVersionUID = -8325988931728734377L;
-
-    public WrappedRow() {
-        super();
-    }
-
-    public WrappedRow(Row row, String table) {
-        super(row, table);
-    }
-
-    public Row getRow() {
-        return this.f0;
-    }
-
-    public String getTable() {
-        return this.f1;
-    }
-
-    public void setRow(Row row) {
-        this.f0 = row;
-    }
-
-    public void setTable(String table) {
-        this.f1 = table;
-    }
-}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/CommitWrapper.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/CommitWrapper.java
deleted file mode 100644
index c89496143..000000000
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/CommitWrapper.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.flink.sink;
-
-/**
- * The commit message wrapper, which is used to wrapper the different commit messages and unify the
- * different implementations of {@link CommitT}
- *
- * @param <CommitT> The generic type of commit message
- */
-public class CommitWrapper<CommitT> {
-    private final CommitT commit;
-
-    public CommitWrapper(CommitT commit) {
-        this.commit = commit;
-    }
-
-    public CommitT getCommit() {
-        return commit;
-    }
-}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitter.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitter.java
deleted file mode 100644
index 3bbb2633a..000000000
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitter.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.flink.sink;
-
-import org.apache.seatunnel.api.sink.SinkCommitter;
-
-import org.apache.flink.api.connector.sink.Committer;
-import org.apache.flink.api.connector.sink.Sink;
-
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.stream.Collectors;
-
-/**
- * The committer wrapper of {@link SinkCommitter}, which is created by {@link
- * Sink#createCommitter()}, used to unify the different sink committer implementations
- *
- * @param <CommT> The generic type of commit message
- */
-@Slf4j
-public class FlinkCommitter<CommT> implements Committer<CommitWrapper<CommT>> {
-
-    private final SinkCommitter<CommT> sinkCommitter;
-
-    FlinkCommitter(SinkCommitter<CommT> sinkCommitter) {
-        this.sinkCommitter = sinkCommitter;
-    }
-
-    @Override
-    public List<CommitWrapper<CommT>> commit(List<CommitWrapper<CommT>> committables)
-            throws IOException {
-        List<CommT> reCommittable =
-                sinkCommitter.commit(
-                        committables.stream()
-                                .map(CommitWrapper::getCommit)
-                                .collect(Collectors.toList()));
-        if (reCommittable != null && !reCommittable.isEmpty()) {
-            log.warn("this version not support re-commit when use flink engine");
-        }
-        // TODO re-commit the data
-        return new ArrayList<>();
-    }
-
-    @Override
-    public void close() throws Exception {}
-}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java
deleted file mode 100644
index 7c95735e1..000000000
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.flink.sink;
-
-import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
-
-import org.apache.flink.api.connector.sink.GlobalCommitter;
-import org.apache.flink.api.connector.sink.Sink;
-
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.stream.Collectors;
-
-/**
- * The committer wrapper of {@link SinkAggregatedCommitter}, which is created by {@link
- * Sink#createGlobalCommitter()}, used to unify the different implementations of {@link
- * SinkAggregatedCommitter}
- *
- * @param <CommT> The generic type of commit message type
- * @param <GlobalCommT> The generic type of global commit message type
- */
-@Slf4j
-public class FlinkGlobalCommitter<CommT, GlobalCommT>
-        implements GlobalCommitter<CommitWrapper<CommT>, GlobalCommT> {
-
-    private final SinkAggregatedCommitter<CommT, GlobalCommT> aggregatedCommitter;
-
-    FlinkGlobalCommitter(SinkAggregatedCommitter<CommT, GlobalCommT> aggregatedCommitter) {
-        this.aggregatedCommitter = aggregatedCommitter;
-    }
-
-    @Override
-    public List<GlobalCommT> filterRecoveredCommittables(List globalCommittables)
-            throws IOException {
-        return Collections.emptyList();
-    }
-
-    @Override
-    public GlobalCommT combine(List<CommitWrapper<CommT>> committables) throws IOException {
-        return aggregatedCommitter.combine(
-                committables.stream().map(CommitWrapper::getCommit).collect(Collectors.toList()));
-    }
-
-    @Override
-    public List<GlobalCommT> commit(List<GlobalCommT> globalCommittables) throws IOException {
-        List<GlobalCommT> reCommittable = aggregatedCommitter.commit(globalCommittables);
-        if (reCommittable != null && !reCommittable.isEmpty()) {
-            log.warn("this version not support re-commit when use flink engine");
-        }
-        // TODO re-commit the data
-        return new ArrayList<>();
-    }
-
-    @Override
-    public void endOfInput() throws IOException {}
-
-    @Override
-    public void close() throws Exception {
-        aggregatedCommitter.close();
-    }
-}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkWriterState.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkWriterState.java
deleted file mode 100644
index 05fe17144..000000000
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkWriterState.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.flink.sink;
-
-import java.io.Serializable;
-
-/**
- * The writer state wrapper of {@link StateT}, used to unify the different implementations of {@link
- * StateT}
- *
- * @param <StateT> The generic type of the writer state
- */
-public class FlinkWriterState<StateT> implements Serializable {
-
-    private long checkpointId = 0;
-
-    private StateT state;
-
-    public FlinkWriterState(long checkpointId, StateT state) {
-        this.checkpointId = checkpointId;
-        this.state = state;
-    }
-
-    public long getCheckpointId() {
-        return checkpointId;
-    }
-
-    public void setCheckpointId(long checkpointId) {
-        this.checkpointId = checkpointId;
-    }
-
-    public StateT getState() {
-        return state;
-    }
-
-    public void setState(StateT state) {
-        this.state = state;
-    }
-}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
deleted file mode 100644
index f0c6fc2ae..000000000
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.flink.source;
-
-import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
-import org.apache.seatunnel.translation.source.BaseSourceFunction;
-
-import org.apache.flink.api.common.state.CheckpointListener;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.types.Row;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * The abstract implementation of {@link RichSourceFunction}, the entrypoint of flink source
- * translation
- */
-public abstract class BaseSeaTunnelSourceFunction extends RichSourceFunction<Row>
-        implements CheckpointListener, ResultTypeQueryable<Row>, CheckpointedFunction {
-    private static final Logger LOG = LoggerFactory.getLogger(BaseSeaTunnelSourceFunction.class);
-
-    protected final SeaTunnelSource<SeaTunnelRow, ?, ?> source;
-    protected transient volatile BaseSourceFunction<SeaTunnelRow> internalSource;
-
-    protected transient ListState<Map<Integer, List<byte[]>>> sourceState;
-    protected transient volatile Map<Integer, List<byte[]>> restoredState;
-
-    protected final AtomicLong latestCompletedCheckpointId = new AtomicLong(0);
-    protected final AtomicLong latestTriggerCheckpointId = new AtomicLong(0);
-
-    /** Flag indicating whether the consumer is still running. */
-    private volatile boolean running = true;
-
-    public BaseSeaTunnelSourceFunction(SeaTunnelSource<SeaTunnelRow, ?, ?> source) {
-        this.source = source;
-    }
-
-    @Override
-    public void open(Configuration parameters) throws Exception {
-        super.open(parameters);
-        this.internalSource = createInternalSource();
-        this.internalSource.open();
-    }
-
-    protected abstract BaseSourceFunction<SeaTunnelRow> createInternalSource();
-
-    @SuppressWarnings("checkstyle:MagicNumber")
-    @Override
-    public void run(SourceFunction.SourceContext<Row> sourceContext) throws Exception {
-        internalSource.run(
-                new RowCollector(
-                        sourceContext,
-                        sourceContext.getCheckpointLock(),
-                        source.getProducedType()));
-        // Wait for a checkpoint to complete:
-        // In the current version(version < 1.14.0), when the operator state of the source changes
-        // to FINISHED, jobs cannot be checkpoint executed.
-        final long prevCheckpointId = latestTriggerCheckpointId.get();
-        // Ensured Checkpoint enabled
-        if (getRuntimeContext() instanceof StreamingRuntimeContext
-                && ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled()) {
-            while (running && prevCheckpointId >= latestCompletedCheckpointId.get()) {
-                Thread.sleep(100);
-            }
-        }
-    }
-
-    @Override
-    public void close() throws Exception {
-        cancel();
-        LOG.debug("Close the SeaTunnelSourceFunction of Flink.");
-    }
-
-    @Override
-    public void cancel() {
-        running = false;
-        try {
-            if (internalSource != null) {
-                LOG.debug("Cancel the SeaTunnelSourceFunction of Flink.");
-                internalSource.close();
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public void notifyCheckpointComplete(long checkpointId) throws Exception {
-        internalSource.notifyCheckpointComplete(checkpointId);
-        latestCompletedCheckpointId.set(checkpointId);
-    }
-
-    @Override
-    public void notifyCheckpointAborted(long checkpointId) throws Exception {
-        internalSource.notifyCheckpointAborted(checkpointId);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public TypeInformation<Row> getProducedType() {
-        return (TypeInformation<Row>) TypeConverterUtils.convert(source.getProducedType());
-    }
-
-    @Override
-    public void snapshotState(FunctionSnapshotContext snapshotContext) throws Exception {
-        final long checkpointId = snapshotContext.getCheckpointId();
-        latestTriggerCheckpointId.set(checkpointId);
-        if (!running) {
-            LOG.debug("snapshotState() called on closed source");
-        } else {
-            sourceState.clear();
-            sourceState.add(internalSource.snapshotState(checkpointId));
-        }
-    }
-
-    @Override
-    public void initializeState(FunctionInitializationContext initializeContext) throws Exception {
-        this.restoredState = new HashMap<>();
-        this.sourceState =
-                initializeContext
-                        .getOperatorStateStore()
-                        .getListState(
-                                new ListStateDescriptor<>(
-                                        getStateName(),
-                                        Types.MAP(
-                                                BasicTypeInfo.INT_TYPE_INFO,
-                                                Types.LIST(
-                                                        PrimitiveArrayTypeInfo
-                                                                .BYTE_PRIMITIVE_ARRAY_TYPE_INFO))));
-        if (initializeContext.isRestored()) {
-            // populate actual holder for restored state
-            sourceState.get().forEach(map -> restoredState.putAll(map));
-            LOG.info(
-                    "Consumer subtask {} restored state",
-                    getRuntimeContext().getIndexOfThisSubtask());
-        } else {
-            LOG.info(
-                    "Consumer subtask {} has no restore state.",
-                    getRuntimeContext().getIndexOfThisSubtask());
-        }
-    }
-
-    protected abstract String getStateName();
-}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/source/RowCollector.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/source/RowCollector.java
deleted file mode 100644
index 63b70242f..000000000
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/source/RowCollector.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.flink.source;
-
-import org.apache.seatunnel.api.source.Collector;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.translation.flink.serialization.FlinkRowConverter;
-
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.types.Row;
-
-import java.io.IOException;
-
-public class RowCollector implements Collector<SeaTunnelRow> {
-
-    protected final SourceFunction.SourceContext<Row> internalCollector;
-    protected final FlinkRowConverter rowSerialization;
-    protected final Object checkpointLock;
-
-    public RowCollector(
-            SourceFunction.SourceContext<Row> internalCollector,
-            Object checkpointLock,
-            SeaTunnelDataType<?> dataType) {
-        this.internalCollector = internalCollector;
-        this.checkpointLock = checkpointLock;
-        this.rowSerialization = new FlinkRowConverter(dataType);
-    }
-
-    @Override
-    public void collect(SeaTunnelRow record) {
-        try {
-            internalCollector.collect(rowSerialization.convert(record));
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public Object getCheckpointLock() {
-        return this.checkpointLock;
-    }
-}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelCoordinatedSource.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelCoordinatedSource.java
deleted file mode 100644
index 0d03dbf78..000000000
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelCoordinatedSource.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.flink.source;
-
-import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.translation.source.BaseSourceFunction;
-import org.apache.seatunnel.translation.source.CoordinatedSource;
-
-/** The coordinated source function implementation of {@link BaseSeaTunnelSourceFunction} */
-public class SeaTunnelCoordinatedSource extends BaseSeaTunnelSourceFunction {
-
-    protected static final String COORDINATED_SOURCE_STATE_NAME = "coordinated-source-states";
-
-    public SeaTunnelCoordinatedSource(SeaTunnelSource<SeaTunnelRow, ?, ?> source) {
-        // TODO: Make sure the source is coordinated.
-        super(source);
-    }
-
-    @Override
-    protected BaseSourceFunction<SeaTunnelRow> createInternalSource() {
-        return new CoordinatedSource<>(
-                source, restoredState, getRuntimeContext().getNumberOfParallelSubtasks());
-    }
-
-    @Override
-    protected String getStateName() {
-        return COORDINATED_SOURCE_STATE_NAME;
-    }
-}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
deleted file mode 100644
index 6e1515d56..000000000
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.translation.flink.source;
-
-import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.translation.source.BaseSourceFunction;
-import org.apache.seatunnel.translation.source.ParallelSource;
-
-import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
-import org.apache.flink.types.Row;
-
-/** The parallel source function implementation of {@link BaseSeaTunnelSourceFunction} */
-public class SeaTunnelParallelSource extends BaseSeaTunnelSourceFunction
-        implements ParallelSourceFunction<Row> {
-
-    protected static final String PARALLEL_SOURCE_STATE_NAME = "parallel-source-states";
-
-    public SeaTunnelParallelSource(SeaTunnelSource<SeaTunnelRow, ?, ?> source) {
-        // TODO: Make sure the source is uncoordinated.
-        super(source);
-    }
-
-    @Override
-    protected BaseSourceFunction<SeaTunnelRow> createInternalSource() {
-        return new ParallelSource<>(
-                source,
-                restoredState,
-                getRuntimeContext().getNumberOfParallelSubtasks(),
-                getRuntimeContext().getIndexOfThisSubtask());
-    }
-
-    @Override
-    protected String getStateName() {
-        return PARALLEL_SOURCE_STATE_NAME;
-    }
-}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/pom.xml b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/pom.xml
similarity index 93%
copy from seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/pom.xml
copy to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/pom.xml
index 03aa1ddae..de0f3702b 100644
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/pom.xml
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/pom.xml
@@ -22,8 +22,9 @@
         <version>${revision}</version>
     </parent>
 
-    <artifactId>seatunnel-translation-flink-15</artifactId>
-    <name>SeaTunnel : Translation : Flink : 1.5</name>
+    <artifactId>seatunnel-translation-flink-common</artifactId>
+    <packaging>jar</packaging>
+    <name>SeaTunnel : Translation : Flink : Common</name>
 
     <dependencies>
 
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/serialization/CommitWrapperSerializer.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/CommitWrapperSerializer.java
similarity index 100%
rename from seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/serialization/CommitWrapperSerializer.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/CommitWrapperSerializer.java
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java
similarity index 100%
rename from seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkSimpleVersionedSerializer.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkSimpleVersionedSerializer.java
similarity index 100%
rename from seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkSimpleVersionedSerializer.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkSimpleVersionedSerializer.java
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkWriterStateSerializer.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkWriterStateSerializer.java
similarity index 100%
rename from seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkWriterStateSerializer.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkWriterStateSerializer.java
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/serialization/KryoTypeInfo.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/KryoTypeInfo.java
similarity index 100%
rename from seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/serialization/KryoTypeInfo.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/KryoTypeInfo.java
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/serialization/WrappedRow.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/WrappedRow.java
similarity index 100%
rename from seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/serialization/WrappedRow.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/WrappedRow.java
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/CommitWrapper.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/CommitWrapper.java
similarity index 100%
rename from seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/CommitWrapper.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/CommitWrapper.java
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitter.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitter.java
similarity index 100%
rename from seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitter.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitter.java
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java
similarity index 100%
rename from seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
similarity index 100%
rename from seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
similarity index 100%
rename from seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkWriterState.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkWriterState.java
similarity index 100%
rename from seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkWriterState.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkWriterState.java
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
similarity index 100%
rename from seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/source/RowCollector.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/RowCollector.java
similarity index 100%
rename from seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/source/RowCollector.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/RowCollector.java
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelCoordinatedSource.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelCoordinatedSource.java
similarity index 100%
rename from seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelCoordinatedSource.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelCoordinatedSource.java
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
similarity index 100%
rename from seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
similarity index 100%
rename from seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-15/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
rename to seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java