You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/18 08:15:26 UTC

[incubator-seatunnel] branch api-draft updated: Add SeaTunnelRowTypeInfo to SeaTunnelSink (#1904)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new 727c7b26 Add SeaTunnelRowTypeInfo to SeaTunnelSink (#1904)
727c7b26 is described below

commit 727c7b2618427840782760953ce9a32f07c6672a
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Wed May 18 16:15:22 2022 +0800

    Add SeaTunnelRowTypeInfo to SeaTunnelSink (#1904)
---
 .../apache/seatunnel/api/sink/SeaTunnelSink.java   | 10 +++
 .../apache/seatunnel/api/table/type/Converter.java |  2 +
 .../apache/seatunnel/api/table/type/PojoType.java  |  4 ++
 .../api/table/type/SeaTunnelRowTypeInfo.java       |  6 +-
 .../seatunnel/console/sink/ConsoleSink.java        | 10 ++-
 .../seatunnel/console/sink/ConsoleSinkWriter.java  |  7 ++
 .../core/flink/execution/SinkExecuteProcessor.java | 30 +++++---
 .../flink/types/ArrayTypeConverter.java            |  5 ++
 .../flink/types/BasicTypeConverter.java            |  6 ++
 .../flink/types/FlinkTypeConverter.java            |  9 +++
 .../translation/flink/types/PojoTypeConverter.java |  5 ++
 .../flink/types/TimestampTypeConverter.java        |  5 ++
 .../flink/utils/TypeConverterUtils.java            | 80 ++++++++++++++++++++++
 .../spark/types/ArrayTypeConverter.java            |  5 ++
 .../spark/types/BasicTypeConverter.java            |  5 ++
 .../translation/spark/types/PojoTypeConverter.java |  5 ++
 .../spark/types/SparkDataTypeConverter.java        |  3 +
 .../spark/types/TimestampTypeConverter.java        |  5 ++
 18 files changed, 191 insertions(+), 11 deletions(-)

diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
index 4c71f3b5..0033e3d9 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.api.sink;
 
 import org.apache.seatunnel.api.common.PluginIdentifierInterface;
 import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -40,6 +41,15 @@ import java.util.Optional;
  */
 public interface SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> extends Serializable, PluginIdentifierInterface {
 
+    /**
+     * Set the row type info of sink row data. This method will be automatically called by translation.
+     *
+     * @param seaTunnelRowTypeInfo The row type info of sink.
+     */
+    default void setTypeInfo(SeaTunnelRowTypeInfo seaTunnelRowTypeInfo) {
+
+    }
+
     /**
      * This method will be called to creat {@link SinkWriter}
      *
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/Converter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/Converter.java
index 662f2627..dc516665 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/Converter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/Converter.java
@@ -20,4 +20,6 @@ package org.apache.seatunnel.api.table.type;
 public interface Converter<T1, T2> {
 
     T2 convert(T1 dataType);
+
+    T1 reconvert(T2 dataType);
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/PojoType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/PojoType.java
index e6af968f..f244143d 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/PojoType.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/PojoType.java
@@ -25,6 +25,10 @@ public class PojoType<T> implements SeaTunnelDataType<T> {
     private final Field[] fields;
     private final SeaTunnelDataType<?>[] fieldTypes;
 
+    public PojoType(Class<T> pojoClass) {
+        this(pojoClass, null, null);
+    }
+
     public PojoType(Class<T> pojoClass, Field[] fields, SeaTunnelDataType<?>[] fieldTypes) {
         this.pojoClass = pojoClass;
         this.fields = fields;
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRowTypeInfo.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRowTypeInfo.java
index bd721b9d..835e4b3f 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRowTypeInfo.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRowTypeInfo.java
@@ -20,9 +20,13 @@ package org.apache.seatunnel.api.table.type;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 
+import java.io.Serializable;
+
 @Data
 @AllArgsConstructor
-public class SeaTunnelRowTypeInfo {
+public class SeaTunnelRowTypeInfo implements Serializable {
+    private static final long serialVersionUID = 1L;
+
     /**
      * The field name of the {@link SeaTunnelRow}.
      */
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
index b3cb26ab..5ae9aee3 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.connectors.seatunnel.console.sink;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
 import org.apache.seatunnel.connectors.seatunnel.console.state.ConsoleState;
 
 import com.google.auto.service.AutoService;
@@ -29,9 +30,16 @@ import java.util.List;
 @AutoService(SeaTunnelSink.class)
 public class ConsoleSink implements SeaTunnelSink<SeaTunnelRow, ConsoleState, ConsoleCommitInfo, ConsoleAggregatedCommitInfo> {
 
+    private SeaTunnelRowTypeInfo seaTunnelRowTypeInfo;
+
+    @Override
+    public void setTypeInfo(SeaTunnelRowTypeInfo seaTunnelRowTypeInfo) {
+        this.seaTunnelRowTypeInfo = seaTunnelRowTypeInfo;
+    }
+
     @Override
     public SinkWriter<SeaTunnelRow, ConsoleCommitInfo, ConsoleState> createWriter(SinkWriter.Context context) {
-        return new ConsoleSinkWriter();
+        return new ConsoleSinkWriter(seaTunnelRowTypeInfo);
     }
 
     @Override
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
index af54609d..650aa4c2 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.connectors.seatunnel.console.sink;
 
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
 import org.apache.seatunnel.connectors.seatunnel.console.state.ConsoleState;
 
 import org.slf4j.Logger;
@@ -30,6 +31,12 @@ public class ConsoleSinkWriter implements SinkWriter<SeaTunnelRow, ConsoleCommit
 
     private static final Logger LOGGER = LoggerFactory.getLogger(ConsoleSinkWriter.class);
 
+    private final SeaTunnelRowTypeInfo seaTunnelRowTypeInfo;
+
+    public ConsoleSinkWriter(SeaTunnelRowTypeInfo seaTunnelRowTypeInfo) {
+        this.seaTunnelRowTypeInfo = seaTunnelRowTypeInfo;
+    }
+
     @Override
     @SuppressWarnings("checkstyle:RegexpSingleline")
     public void write(SeaTunnelRow element) {
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SinkExecuteProcessor.java
index 85726023..f1621533 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SinkExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SinkExecuteProcessor.java
@@ -18,28 +18,32 @@
 package org.apache.seatunnel.core.flink.execution;
 
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
 import org.apache.seatunnel.translation.flink.sink.FlinkSinkConverter;
+import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import com.google.common.collect.Lists;
-import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.types.Row;
 
 import java.net.URL;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
 import scala.Serializable;
 
-public class SinkExecuteProcessor extends AbstractPluginExecuteProcessor<Sink<Row, Serializable, Serializable, Serializable>> {
+public class SinkExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable>> {
 
     protected SinkExecuteProcessor(FlinkEnvironment flinkEnvironment,
                                    List<? extends Config> pluginConfigs) {
@@ -47,19 +51,16 @@ public class SinkExecuteProcessor extends AbstractPluginExecuteProcessor<Sink<Ro
     }
 
     @Override
-    protected List<Sink<Row, Serializable, Serializable, Serializable>> initializePlugins(List<? extends Config> pluginConfigs) {
+    protected List<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable>> initializePlugins(List<? extends Config> pluginConfigs) {
         SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery();
         List<URL> pluginJars = new ArrayList<>();
-        FlinkSinkConverter<SeaTunnelRow, Row, Serializable, Serializable, Serializable> flinkSinkConverter = new FlinkSinkConverter<>();
-        List<Sink<Row, Serializable, Serializable, Serializable>> sinks = pluginConfigs.stream().map(sinkConfig -> {
+        List<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable>> sinks = pluginConfigs.stream().map(sinkConfig -> {
             PluginIdentifier pluginIdentifier = PluginIdentifier.of(
                 "seatunnel",
                 "sink",
                 sinkConfig.getString("plugin_name"));
             pluginJars.addAll(sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
-            SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> pluginInstance =
-                sinkPluginDiscovery.getPluginInstance(pluginIdentifier);
-            return flinkSinkConverter.convert(pluginInstance, Collections.emptyMap());
+            return (SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable>) sinkPluginDiscovery.getPluginInstance(pluginIdentifier);
         }).collect(Collectors.toList());
         flinkEnvironment.registerPlugin(pluginJars);
         return sinks;
@@ -68,12 +69,23 @@ public class SinkExecuteProcessor extends AbstractPluginExecuteProcessor<Sink<Ro
     @Override
     public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams) throws Exception {
         DataStream<Row> input = upstreamDataStreams.get(0);
+        FlinkSinkConverter<SeaTunnelRow, Row, Serializable, Serializable, Serializable> flinkSinkConverter = new FlinkSinkConverter<>();
         for (int i = 0; i < plugins.size(); i++) {
             Config sinkConfig = pluginConfigs.get(i);
+            SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink = plugins.get(i);
             DataStream<Row> stream = fromSourceTable(sinkConfig).orElse(input);
-            stream.sinkTo(plugins.get(i));
+            seaTunnelSink.setTypeInfo(getSeaTunnelRowTypeInfo(stream));
+            stream.sinkTo(flinkSinkConverter.convert(seaTunnelSink, Collections.emptyMap()));
         }
         // the sink is the last stream
         return null;
     }
+
+    private SeaTunnelRowTypeInfo getSeaTunnelRowTypeInfo(DataStream<Row> stream) {
+        RowTypeInfo typeInformation = (RowTypeInfo) stream.getType();
+        String[] fieldNames = typeInformation.getFieldNames();
+        SeaTunnelDataType<?>[] seaTunnelDataTypes = Arrays.stream(typeInformation.getFieldTypes())
+            .map(TypeConverterUtils::convertType).toArray(SeaTunnelDataType[]::new);
+        return new SeaTunnelRowTypeInfo(fieldNames, seaTunnelDataTypes);
+    }
 }
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/ArrayTypeConverter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/ArrayTypeConverter.java
index c10f0450..f52679b0 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/ArrayTypeConverter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/ArrayTypeConverter.java
@@ -58,4 +58,9 @@ public class ArrayTypeConverter<T1, T2> implements FlinkTypeConverter<ArrayType<
         }
         throw new IllegalArgumentException("Unsupported basic type: " + elementType);
     }
+
+    @Override
+    public ArrayType<T1> reconvert(BasicArrayTypeInfo<T1, T2> typeInformation) {
+        return null;
+    }
 }
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/BasicTypeConverter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/BasicTypeConverter.java
index e379076c..6b6122d9 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/BasicTypeConverter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/BasicTypeConverter.java
@@ -112,4 +112,10 @@ public class BasicTypeConverter<T1>
     public TypeInformation<T1> convert(BasicType<T1> seaTunnelDataType) {
         return flinkTypeInformation;
     }
+
+    @Override
+    public BasicType<T1> reconvert(TypeInformation<T1> dataType) {
+        return seaTunnelDataType;
+    }
+
 }
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
index 8e46e9e1..f5e19c05 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
@@ -36,4 +36,13 @@ public interface FlinkTypeConverter<T1, T2> extends Converter<T1, T2> {
     @Override
     T2 convert(T1 seaTunnelDataType);
 
+    /**
+     * Convert flink {@link TypeInformation} to SeaTunnel {@link SeaTunnelDataType}.
+     *
+     * @param typeInformation flink {@link TypeInformation}
+     * @return seatunnel {@link SeaTunnelDataType}
+     */
+    @Override
+    T1 reconvert(T2 typeInformation);
+
 }
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/PojoTypeConverter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/PojoTypeConverter.java
index b2e41e56..637fefae 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/PojoTypeConverter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/PojoTypeConverter.java
@@ -28,4 +28,9 @@ public class PojoTypeConverter<T1> implements FlinkTypeConverter<PojoType<T1>, P
         Class<T1> pojoClass = seaTunnelDataType.getPojoClass();
         return (PojoTypeInfo<T1>) PojoTypeInfo.of(pojoClass);
     }
+
+    @Override
+    public PojoType<T1> reconvert(PojoTypeInfo<T1> typeInformation) {
+        return new PojoType<>(typeInformation.getTypeClass());
+    }
 }
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/TimestampTypeConverter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/TimestampTypeConverter.java
index b8bf1be7..06067065 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/TimestampTypeConverter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/TimestampTypeConverter.java
@@ -33,4 +33,9 @@ public class TimestampTypeConverter implements FlinkTypeConverter<TimestampType,
     public TimestampDataTypeInfo convert(TimestampType seaTunnelDataType) {
         return new TimestampDataTypeInfo(seaTunnelDataType.getPrecision());
     }
+
+    @Override
+    public TimestampType reconvert(TimestampDataTypeInfo typeInformation) {
+        return new TimestampType(typeInformation.getPrecision());
+    }
 }
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
index 6341fcdb..51a90297 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
@@ -31,6 +31,7 @@ import org.apache.seatunnel.translation.flink.types.PojoTypeConverter;
 import org.apache.seatunnel.translation.flink.types.TimestampTypeConverter;
 
 import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.EnumTypeInfo;
 import org.apache.flink.api.java.typeutils.ListTypeInfo;
@@ -47,6 +48,14 @@ public class TypeConverterUtils {
         throw new UnsupportedOperationException("TypeConverterUtils is a utility class and cannot be instantiated");
     }
 
+    public static <T1, T2> SeaTunnelDataType<T2> convertType(TypeInformation<T1> dataType) {
+        if (dataType instanceof BasicTypeInfo) {
+            return (SeaTunnelDataType<T2>) convertBasicType((BasicTypeInfo<T1>) dataType);
+        }
+        // todo:
+        throw new IllegalArgumentException("Unsupported data type: " + dataType);
+    }
+
     @SuppressWarnings("unchecked")
     public static <T1, T2> TypeInformation<T2> convertType(SeaTunnelDataType<T1> dataType) {
         if (dataType instanceof BasicType) {
@@ -146,6 +155,77 @@ public class TypeConverterUtils {
         throw new IllegalArgumentException("Unsupported basic type: " + basicType);
     }
 
+    @SuppressWarnings("unchecked")
+    private static <T1> SeaTunnelDataType<T1> convertBasicType(BasicTypeInfo<T1> flinkDataType) {
+        Class<T1> physicalTypeClass = flinkDataType.getTypeClass();
+        if (physicalTypeClass == Boolean.class) {
+            TypeInformation<Boolean> booleanTypeInformation = (TypeInformation<Boolean>) flinkDataType;
+            return (SeaTunnelDataType<T1>)
+                BasicTypeConverter.BOOLEAN_CONVERTER.reconvert(booleanTypeInformation);
+        }
+        if (physicalTypeClass == String.class) {
+            TypeInformation<String> stringBasicType = (TypeInformation<String>) flinkDataType;
+            return (SeaTunnelDataType<T1>)
+                BasicTypeConverter.STRING_CONVERTER.reconvert(stringBasicType);
+        }
+        if (physicalTypeClass == Date.class) {
+            TypeInformation<Date> dateBasicType = (TypeInformation<Date>) flinkDataType;
+            return (SeaTunnelDataType<T1>)
+                BasicTypeConverter.DATE_CONVERTER.reconvert(dateBasicType);
+        }
+        if (physicalTypeClass == Double.class) {
+            TypeInformation<Double> doubleBasicType = (TypeInformation<Double>) flinkDataType;
+            return (SeaTunnelDataType<T1>)
+                BasicTypeConverter.DOUBLE_CONVERTER.reconvert(doubleBasicType);
+        }
+        if (physicalTypeClass == Integer.class) {
+            TypeInformation<Integer> integerBasicType = (TypeInformation<Integer>) flinkDataType;
+            return (SeaTunnelDataType<T1>)
+                BasicTypeConverter.INTEGER_CONVERTER.reconvert(integerBasicType);
+        }
+        if (physicalTypeClass == Long.class) {
+            TypeInformation<Long> longBasicType = (TypeInformation<Long>) flinkDataType;
+            return (SeaTunnelDataType<T1>)
+                BasicTypeConverter.LONG_CONVERTER.reconvert(longBasicType);
+        }
+        if (physicalTypeClass == Float.class) {
+            TypeInformation<Float> floatBasicType = (TypeInformation<Float>) flinkDataType;
+            return (SeaTunnelDataType<T1>)
+                BasicTypeConverter.FLOAT_CONVERTER.reconvert(floatBasicType);
+        }
+        if (physicalTypeClass == Byte.class) {
+            TypeInformation<Byte> byteBasicType = (TypeInformation<Byte>) flinkDataType;
+            return (SeaTunnelDataType<T1>)
+                BasicTypeConverter.BYTE_CONVERTER.reconvert(byteBasicType);
+        }
+        if (physicalTypeClass == Short.class) {
+            TypeInformation<Short> shortBasicType = (TypeInformation<Short>) flinkDataType;
+            return (SeaTunnelDataType<T1>)
+                BasicTypeConverter.SHORT_CONVERTER.reconvert(shortBasicType);
+        }
+        if (physicalTypeClass == Character.class) {
+            TypeInformation<Character> characterBasicType = (TypeInformation<Character>) flinkDataType;
+            return (SeaTunnelDataType<T1>)
+                BasicTypeConverter.CHARACTER_CONVERTER.reconvert(characterBasicType);
+        }
+        if (physicalTypeClass == BigInteger.class) {
+            TypeInformation<BigInteger> bigIntegerBasicType = (TypeInformation<BigInteger>) flinkDataType;
+            return (SeaTunnelDataType<T1>)
+                BasicTypeConverter.BIG_INTEGER_CONVERTER.reconvert(bigIntegerBasicType);
+        }
+        if (physicalTypeClass == BigDecimal.class) {
+            TypeInformation<BigDecimal> bigDecimalBasicType = (TypeInformation<BigDecimal>) flinkDataType;
+            return (SeaTunnelDataType<T1>)
+                BasicTypeConverter.BIG_DECIMAL_CONVERTER.reconvert(bigDecimalBasicType);
+        }
+        if (physicalTypeClass == Void.class) {
+            TypeInformation<Void> voidBasicType = (TypeInformation<Void>) flinkDataType;
+            return (SeaTunnelDataType<T1>)
+                BasicTypeConverter.NULL_CONVERTER.reconvert(voidBasicType);
+        }
+        throw new IllegalArgumentException("Unsupported flink type: " + flinkDataType);
+    }
+
     public static <T1, T2> BasicArrayTypeInfo<T1, T2> convertArrayType(ArrayType<T1> arrayType) {
         ArrayTypeConverter<T1, T2> arrayTypeConverter = new ArrayTypeConverter<>();
         return arrayTypeConverter.convert(arrayType);
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/ArrayTypeConverter.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/ArrayTypeConverter.java
index 71f0820d..0aebd97c 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/ArrayTypeConverter.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/ArrayTypeConverter.java
@@ -31,4 +31,9 @@ public class ArrayTypeConverter<T1>
         DataType elementType = TypeConverterUtils.convert(seaTunnelDataType.getElementType());
         return DataTypes.createArrayType(elementType);
     }
+
+    @Override
+    public ArrayType<T1> reconvert(org.apache.spark.sql.types.ArrayType dataType) {
+        return null;
+    }
 }
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/BasicTypeConverter.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/BasicTypeConverter.java
index 1c8df193..c2c43376 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/BasicTypeConverter.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/BasicTypeConverter.java
@@ -126,4 +126,9 @@ public class BasicTypeConverter<T1>
     public DataType convert(BasicType<T1> seaTunnelDataType) {
         return sparkDataType;
     }
+
+    @Override
+    public BasicType<T1> reconvert(DataType dataType) {
+        return seatunnelDataType;
+    }
 }
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/PojoTypeConverter.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/PojoTypeConverter.java
index 4c757151..c2f1314f 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/PojoTypeConverter.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/PojoTypeConverter.java
@@ -26,4 +26,9 @@ public class PojoTypeConverter<T1> implements SparkDataTypeConverter<PojoType<T1
     public ObjectType convert(PojoType<T1> seaTunnelDataType) {
         return new ObjectType(seaTunnelDataType.getPojoClass());
     }
+
+    @Override
+    public PojoType<T1> reconvert(ObjectType dataType) {
+        return new PojoType<>((Class<T1>) dataType.cls());
+    }
 }
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/SparkDataTypeConverter.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/SparkDataTypeConverter.java
index 6bbd27bd..7336c1b7 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/SparkDataTypeConverter.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/SparkDataTypeConverter.java
@@ -32,4 +32,7 @@ public interface SparkDataTypeConverter<T1, T2> extends Converter<T1, T2> {
      */
     @Override
     T2 convert(T1 seaTunnelDataType);
+
+    @Override
+    T1 reconvert(T2 dataType);
 }
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/TimestampTypeConverter.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/TimestampTypeConverter.java
index fd6540af..e82aa1ac 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/TimestampTypeConverter.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/TimestampTypeConverter.java
@@ -34,4 +34,9 @@ public class TimestampTypeConverter
     public org.apache.spark.sql.types.TimestampType convert(TimestampType seaTunnelDataType) {
         return (org.apache.spark.sql.types.TimestampType) DataTypes.TimestampType;
     }
+
+    @Override
+    public TimestampType reconvert(org.apache.spark.sql.types.TimestampType dataType) {
+        return new TimestampType(dataType.defaultSize());
+    }
 }