You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/18 08:15:26 UTC
[incubator-seatunnel] branch api-draft updated: Add SeaTunnelRowTypeInfo to SeaTunnelSink (#1904)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new 727c7b26 Add SeaTunnelRowTypeInfo to SeaTunnelSink (#1904)
727c7b26 is described below
commit 727c7b2618427840782760953ce9a32f07c6672a
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Wed May 18 16:15:22 2022 +0800
Add SeaTunnelRowTypeInfo to SeaTunnelSink (#1904)
---
.../apache/seatunnel/api/sink/SeaTunnelSink.java | 10 +++
.../apache/seatunnel/api/table/type/Converter.java | 2 +
.../apache/seatunnel/api/table/type/PojoType.java | 4 ++
.../api/table/type/SeaTunnelRowTypeInfo.java | 6 +-
.../seatunnel/console/sink/ConsoleSink.java | 10 ++-
.../seatunnel/console/sink/ConsoleSinkWriter.java | 7 ++
.../core/flink/execution/SinkExecuteProcessor.java | 30 +++++---
.../flink/types/ArrayTypeConverter.java | 5 ++
.../flink/types/BasicTypeConverter.java | 6 ++
.../flink/types/FlinkTypeConverter.java | 9 +++
.../translation/flink/types/PojoTypeConverter.java | 5 ++
.../flink/types/TimestampTypeConverter.java | 5 ++
.../flink/utils/TypeConverterUtils.java | 80 ++++++++++++++++++++++
.../spark/types/ArrayTypeConverter.java | 5 ++
.../spark/types/BasicTypeConverter.java | 5 ++
.../translation/spark/types/PojoTypeConverter.java | 5 ++
.../spark/types/SparkDataTypeConverter.java | 3 +
.../spark/types/TimestampTypeConverter.java | 5 ++
18 files changed, 191 insertions(+), 11 deletions(-)
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
index 4c71f3b5..0033e3d9 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.api.sink;
import org.apache.seatunnel.api.common.PluginIdentifierInterface;
import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
import java.io.IOException;
import java.io.Serializable;
@@ -40,6 +41,15 @@ import java.util.Optional;
*/
public interface SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> extends Serializable, PluginIdentifierInterface {
+ /**
+ * Set the row type info of sink row data. This method will be automatically called by translation.
+ *
+ * @param seaTunnelRowTypeInfo The row type info of sink.
+ */
+ default void setTypeInfo(SeaTunnelRowTypeInfo seaTunnelRowTypeInfo) {
+
+ }
+
/**
* This method will be called to creat {@link SinkWriter}
*
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/Converter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/Converter.java
index 662f2627..dc516665 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/Converter.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/Converter.java
@@ -20,4 +20,6 @@ package org.apache.seatunnel.api.table.type;
public interface Converter<T1, T2> {
T2 convert(T1 dataType);
+
+ T1 reconvert(T2 dataType);
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/PojoType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/PojoType.java
index e6af968f..f244143d 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/PojoType.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/PojoType.java
@@ -25,6 +25,10 @@ public class PojoType<T> implements SeaTunnelDataType<T> {
private final Field[] fields;
private final SeaTunnelDataType<?>[] fieldTypes;
+ public PojoType(Class<T> pojoClass) {
+ this(pojoClass, null, null);
+ }
+
public PojoType(Class<T> pojoClass, Field[] fields, SeaTunnelDataType<?>[] fieldTypes) {
this.pojoClass = pojoClass;
this.fields = fields;
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRowTypeInfo.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRowTypeInfo.java
index bd721b9d..835e4b3f 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRowTypeInfo.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRowTypeInfo.java
@@ -20,9 +20,13 @@ package org.apache.seatunnel.api.table.type;
import lombok.AllArgsConstructor;
import lombok.Data;
+import java.io.Serializable;
+
@Data
@AllArgsConstructor
-public class SeaTunnelRowTypeInfo {
+public class SeaTunnelRowTypeInfo implements Serializable {
+ private static final long serialVersionUID = 1L;
+
/**
* The field name of the {@link SeaTunnelRow}.
*/
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
index b3cb26ab..5ae9aee3 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.connectors.seatunnel.console.sink;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
import org.apache.seatunnel.connectors.seatunnel.console.state.ConsoleState;
import com.google.auto.service.AutoService;
@@ -29,9 +30,16 @@ import java.util.List;
@AutoService(SeaTunnelSink.class)
public class ConsoleSink implements SeaTunnelSink<SeaTunnelRow, ConsoleState, ConsoleCommitInfo, ConsoleAggregatedCommitInfo> {
+ private SeaTunnelRowTypeInfo seaTunnelRowTypeInfo;
+
+ @Override
+ public void setTypeInfo(SeaTunnelRowTypeInfo seaTunnelRowTypeInfo) {
+ this.seaTunnelRowTypeInfo = seaTunnelRowTypeInfo;
+ }
+
@Override
public SinkWriter<SeaTunnelRow, ConsoleCommitInfo, ConsoleState> createWriter(SinkWriter.Context context) {
- return new ConsoleSinkWriter();
+ return new ConsoleSinkWriter(seaTunnelRowTypeInfo);
}
@Override
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
index af54609d..650aa4c2 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.connectors.seatunnel.console.sink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
import org.apache.seatunnel.connectors.seatunnel.console.state.ConsoleState;
import org.slf4j.Logger;
@@ -30,6 +31,12 @@ public class ConsoleSinkWriter implements SinkWriter<SeaTunnelRow, ConsoleCommit
private static final Logger LOGGER = LoggerFactory.getLogger(ConsoleSinkWriter.class);
+ private final SeaTunnelRowTypeInfo seaTunnelRowTypeInfo;
+
+ public ConsoleSinkWriter(SeaTunnelRowTypeInfo seaTunnelRowTypeInfo) {
+ this.seaTunnelRowTypeInfo = seaTunnelRowTypeInfo;
+ }
+
@Override
@SuppressWarnings("checkstyle:RegexpSingleline")
public void write(SeaTunnelRow element) {
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SinkExecuteProcessor.java
index 85726023..f1621533 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SinkExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/execution/SinkExecuteProcessor.java
@@ -18,28 +18,32 @@
package org.apache.seatunnel.core.flink.execution;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
import org.apache.seatunnel.translation.flink.sink.FlinkSinkConverter;
+import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import com.google.common.collect.Lists;
-import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.types.Row;
import java.net.URL;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import scala.Serializable;
-public class SinkExecuteProcessor extends AbstractPluginExecuteProcessor<Sink<Row, Serializable, Serializable, Serializable>> {
+public class SinkExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable>> {
protected SinkExecuteProcessor(FlinkEnvironment flinkEnvironment,
List<? extends Config> pluginConfigs) {
@@ -47,19 +51,16 @@ public class SinkExecuteProcessor extends AbstractPluginExecuteProcessor<Sink<Ro
}
@Override
- protected List<Sink<Row, Serializable, Serializable, Serializable>> initializePlugins(List<? extends Config> pluginConfigs) {
+ protected List<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable>> initializePlugins(List<? extends Config> pluginConfigs) {
SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery();
List<URL> pluginJars = new ArrayList<>();
- FlinkSinkConverter<SeaTunnelRow, Row, Serializable, Serializable, Serializable> flinkSinkConverter = new FlinkSinkConverter<>();
- List<Sink<Row, Serializable, Serializable, Serializable>> sinks = pluginConfigs.stream().map(sinkConfig -> {
+ List<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable>> sinks = pluginConfigs.stream().map(sinkConfig -> {
PluginIdentifier pluginIdentifier = PluginIdentifier.of(
"seatunnel",
"sink",
sinkConfig.getString("plugin_name"));
pluginJars.addAll(sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
- SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> pluginInstance =
- sinkPluginDiscovery.getPluginInstance(pluginIdentifier);
- return flinkSinkConverter.convert(pluginInstance, Collections.emptyMap());
+ return (SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable>) sinkPluginDiscovery.getPluginInstance(pluginIdentifier);
}).collect(Collectors.toList());
flinkEnvironment.registerPlugin(pluginJars);
return sinks;
@@ -68,12 +69,23 @@ public class SinkExecuteProcessor extends AbstractPluginExecuteProcessor<Sink<Ro
@Override
public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams) throws Exception {
DataStream<Row> input = upstreamDataStreams.get(0);
+ FlinkSinkConverter<SeaTunnelRow, Row, Serializable, Serializable, Serializable> flinkSinkConverter = new FlinkSinkConverter<>();
for (int i = 0; i < plugins.size(); i++) {
Config sinkConfig = pluginConfigs.get(i);
+ SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink = plugins.get(i);
DataStream<Row> stream = fromSourceTable(sinkConfig).orElse(input);
- stream.sinkTo(plugins.get(i));
+ seaTunnelSink.setTypeInfo(getSeaTunnelRowTypeInfo(stream));
+ stream.sinkTo(flinkSinkConverter.convert(seaTunnelSink, Collections.emptyMap()));
}
// the sink is the last stream
return null;
}
+
+ private SeaTunnelRowTypeInfo getSeaTunnelRowTypeInfo(DataStream<Row> stream) {
+ RowTypeInfo typeInformation = (RowTypeInfo) stream.getType();
+ String[] fieldNames = typeInformation.getFieldNames();
+ SeaTunnelDataType<?>[] seaTunnelDataTypes = Arrays.stream(typeInformation.getFieldTypes())
+ .map(TypeConverterUtils::convertType).toArray(SeaTunnelDataType[]::new);
+ return new SeaTunnelRowTypeInfo(fieldNames, seaTunnelDataTypes);
+ }
}
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/ArrayTypeConverter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/ArrayTypeConverter.java
index c10f0450..f52679b0 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/ArrayTypeConverter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/ArrayTypeConverter.java
@@ -58,4 +58,9 @@ public class ArrayTypeConverter<T1, T2> implements FlinkTypeConverter<ArrayType<
}
throw new IllegalArgumentException("Unsupported basic type: " + elementType);
}
+
+ @Override
+ public ArrayType<T1> reconvert(BasicArrayTypeInfo<T1, T2> typeInformation) {
+ return null;
+ }
}
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/BasicTypeConverter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/BasicTypeConverter.java
index e379076c..6b6122d9 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/BasicTypeConverter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/BasicTypeConverter.java
@@ -112,4 +112,10 @@ public class BasicTypeConverter<T1>
public TypeInformation<T1> convert(BasicType<T1> seaTunnelDataType) {
return flinkTypeInformation;
}
+
+ @Override
+ public BasicType<T1> reconvert(TypeInformation<T1> dataType) {
+ return seaTunnelDataType;
+ }
+
}
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
index 8e46e9e1..f5e19c05 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
@@ -36,4 +36,13 @@ public interface FlinkTypeConverter<T1, T2> extends Converter<T1, T2> {
@Override
T2 convert(T1 seaTunnelDataType);
+ /**
+ * Convert flink {@link TypeInformation} to SeaTunnel {@link SeaTunnelDataType}.
+ *
+ * @param typeInformation flink {@link TypeInformation}
+ * @return seatunnel {@link SeaTunnelDataType}
+ */
+ @Override
+ T1 reconvert(T2 typeInformation);
+
}
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/PojoTypeConverter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/PojoTypeConverter.java
index b2e41e56..637fefae 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/PojoTypeConverter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/PojoTypeConverter.java
@@ -28,4 +28,9 @@ public class PojoTypeConverter<T1> implements FlinkTypeConverter<PojoType<T1>, P
Class<T1> pojoClass = seaTunnelDataType.getPojoClass();
return (PojoTypeInfo<T1>) PojoTypeInfo.of(pojoClass);
}
+
+ @Override
+ public PojoType<T1> reconvert(PojoTypeInfo<T1> typeInformation) {
+ return new PojoType<>(typeInformation.getTypeClass());
+ }
}
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/TimestampTypeConverter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/TimestampTypeConverter.java
index b8bf1be7..06067065 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/TimestampTypeConverter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/TimestampTypeConverter.java
@@ -33,4 +33,9 @@ public class TimestampTypeConverter implements FlinkTypeConverter<TimestampType,
public TimestampDataTypeInfo convert(TimestampType seaTunnelDataType) {
return new TimestampDataTypeInfo(seaTunnelDataType.getPrecision());
}
+
+ @Override
+ public TimestampType reconvert(TimestampDataTypeInfo typeInformation) {
+ return new TimestampType(typeInformation.getPrecision());
+ }
}
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
index 6341fcdb..51a90297 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
@@ -31,6 +31,7 @@ import org.apache.seatunnel.translation.flink.types.PojoTypeConverter;
import org.apache.seatunnel.translation.flink.types.TimestampTypeConverter;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.EnumTypeInfo;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
@@ -47,6 +48,14 @@ public class TypeConverterUtils {
throw new UnsupportedOperationException("TypeConverterUtils is a utility class and cannot be instantiated");
}
+ public static <T1, T2> SeaTunnelDataType<T2> convertType(TypeInformation<T1> dataType) {
+ if (dataType instanceof BasicTypeInfo) {
+ return (SeaTunnelDataType<T2>) convertBasicType((BasicTypeInfo<T1>) dataType);
+ }
+ // todo:
+ throw new IllegalArgumentException("Unsupported data type: " + dataType);
+ }
+
@SuppressWarnings("unchecked")
public static <T1, T2> TypeInformation<T2> convertType(SeaTunnelDataType<T1> dataType) {
if (dataType instanceof BasicType) {
@@ -146,6 +155,77 @@ public class TypeConverterUtils {
throw new IllegalArgumentException("Unsupported basic type: " + basicType);
}
+ @SuppressWarnings("unchecked")
+ private static <T1> SeaTunnelDataType<T1> convertBasicType(BasicTypeInfo<T1> flinkDataType) {
+ Class<T1> physicalTypeClass = flinkDataType.getTypeClass();
+ if (physicalTypeClass == Boolean.class) {
+ TypeInformation<Boolean> booleanTypeInformation = (TypeInformation<Boolean>) flinkDataType;
+ return (SeaTunnelDataType<T1>)
+ BasicTypeConverter.BOOLEAN_CONVERTER.reconvert(booleanTypeInformation);
+ }
+ if (physicalTypeClass == String.class) {
+ TypeInformation<String> stringBasicType = (TypeInformation<String>) flinkDataType;
+ return (SeaTunnelDataType<T1>)
+ BasicTypeConverter.STRING_CONVERTER.reconvert(stringBasicType);
+ }
+ if (physicalTypeClass == Date.class) {
+ TypeInformation<Date> dateBasicType = (TypeInformation<Date>) flinkDataType;
+ return (SeaTunnelDataType<T1>)
+ BasicTypeConverter.DATE_CONVERTER.reconvert(dateBasicType);
+ }
+ if (physicalTypeClass == Double.class) {
+ TypeInformation<Double> doubleBasicType = (TypeInformation<Double>) flinkDataType;
+ return (SeaTunnelDataType<T1>)
+ BasicTypeConverter.DOUBLE_CONVERTER.reconvert(doubleBasicType);
+ }
+ if (physicalTypeClass == Integer.class) {
+ TypeInformation<Integer> integerBasicType = (TypeInformation<Integer>) flinkDataType;
+ return (SeaTunnelDataType<T1>)
+ BasicTypeConverter.INTEGER_CONVERTER.reconvert(integerBasicType);
+ }
+ if (physicalTypeClass == Long.class) {
+ TypeInformation<Long> longBasicType = (TypeInformation<Long>) flinkDataType;
+ return (SeaTunnelDataType<T1>)
+ BasicTypeConverter.LONG_CONVERTER.reconvert(longBasicType);
+ }
+ if (physicalTypeClass == Float.class) {
+ TypeInformation<Float> floatBasicType = (TypeInformation<Float>) flinkDataType;
+ return (SeaTunnelDataType<T1>)
+ BasicTypeConverter.FLOAT_CONVERTER.reconvert(floatBasicType);
+ }
+ if (physicalTypeClass == Byte.class) {
+ TypeInformation<Byte> byteBasicType = (TypeInformation<Byte>) flinkDataType;
+ return (SeaTunnelDataType<T1>)
+ BasicTypeConverter.BYTE_CONVERTER.reconvert(byteBasicType);
+ }
+ if (physicalTypeClass == Short.class) {
+ TypeInformation<Short> shortBasicType = (TypeInformation<Short>) flinkDataType;
+ return (SeaTunnelDataType<T1>)
+ BasicTypeConverter.SHORT_CONVERTER.reconvert(shortBasicType);
+ }
+ if (physicalTypeClass == Character.class) {
+ TypeInformation<Character> characterBasicType = (TypeInformation<Character>) flinkDataType;
+ return (SeaTunnelDataType<T1>)
+ BasicTypeConverter.CHARACTER_CONVERTER.reconvert(characterBasicType);
+ }
+ if (physicalTypeClass == BigInteger.class) {
+ TypeInformation<BigInteger> bigIntegerBasicType = (TypeInformation<BigInteger>) flinkDataType;
+ return (SeaTunnelDataType<T1>)
+ BasicTypeConverter.BIG_INTEGER_CONVERTER.reconvert(bigIntegerBasicType);
+ }
+ if (physicalTypeClass == BigDecimal.class) {
+ TypeInformation<BigDecimal> bigDecimalBasicType = (TypeInformation<BigDecimal>) flinkDataType;
+ return (SeaTunnelDataType<T1>)
+ BasicTypeConverter.BIG_DECIMAL_CONVERTER.reconvert(bigDecimalBasicType);
+ }
+ if (physicalTypeClass == Void.class) {
+ TypeInformation<Void> voidBasicType = (TypeInformation<Void>) flinkDataType;
+ return (SeaTunnelDataType<T1>)
+ BasicTypeConverter.NULL_CONVERTER.reconvert(voidBasicType);
+ }
+ throw new IllegalArgumentException("Unsupported flink type: " + flinkDataType);
+ }
+
public static <T1, T2> BasicArrayTypeInfo<T1, T2> convertArrayType(ArrayType<T1> arrayType) {
ArrayTypeConverter<T1, T2> arrayTypeConverter = new ArrayTypeConverter<>();
return arrayTypeConverter.convert(arrayType);
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/ArrayTypeConverter.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/ArrayTypeConverter.java
index 71f0820d..0aebd97c 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/ArrayTypeConverter.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/ArrayTypeConverter.java
@@ -31,4 +31,9 @@ public class ArrayTypeConverter<T1>
DataType elementType = TypeConverterUtils.convert(seaTunnelDataType.getElementType());
return DataTypes.createArrayType(elementType);
}
+
+ @Override
+ public ArrayType<T1> reconvert(org.apache.spark.sql.types.ArrayType dataType) {
+ return null;
+ }
}
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/BasicTypeConverter.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/BasicTypeConverter.java
index 1c8df193..c2c43376 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/BasicTypeConverter.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/BasicTypeConverter.java
@@ -126,4 +126,9 @@ public class BasicTypeConverter<T1>
public DataType convert(BasicType<T1> seaTunnelDataType) {
return sparkDataType;
}
+
+ @Override
+ public BasicType<T1> reconvert(DataType dataType) {
+ return seatunnelDataType;
+ }
}
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/PojoTypeConverter.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/PojoTypeConverter.java
index 4c757151..c2f1314f 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/PojoTypeConverter.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/PojoTypeConverter.java
@@ -26,4 +26,9 @@ public class PojoTypeConverter<T1> implements SparkDataTypeConverter<PojoType<T1
public ObjectType convert(PojoType<T1> seaTunnelDataType) {
return new ObjectType(seaTunnelDataType.getPojoClass());
}
+
+ @Override
+ public PojoType<T1> reconvert(ObjectType dataType) {
+ return new PojoType<>((Class<T1>) dataType.cls());
+ }
}
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/SparkDataTypeConverter.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/SparkDataTypeConverter.java
index 6bbd27bd..7336c1b7 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/SparkDataTypeConverter.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/SparkDataTypeConverter.java
@@ -32,4 +32,7 @@ public interface SparkDataTypeConverter<T1, T2> extends Converter<T1, T2> {
*/
@Override
T2 convert(T1 seaTunnelDataType);
+
+ @Override
+ T1 reconvert(T2 dataType);
}
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/TimestampTypeConverter.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/TimestampTypeConverter.java
index fd6540af..e82aa1ac 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/TimestampTypeConverter.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/types/TimestampTypeConverter.java
@@ -34,4 +34,9 @@ public class TimestampTypeConverter
public org.apache.spark.sql.types.TimestampType convert(TimestampType seaTunnelDataType) {
return (org.apache.spark.sql.types.TimestampType) DataTypes.TimestampType;
}
+
+ @Override
+ public TimestampType reconvert(org.apache.spark.sql.types.TimestampType dataType) {
+ return new TimestampType(dataType.defaultSize());
+ }
}