You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/06 15:12:24 UTC
[incubator-seatunnel] branch api-draft updated: Add complex seatunnel datatype (#1807)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new 3753ed3e Add complex seatunnel datatype (#1807)
3753ed3e is described below
commit 3753ed3ef91c64dc8766fa1bec6894f77dc5fb3d
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Fri May 6 23:12:19 2022 +0800
Add complex seatunnel datatype (#1807)
* Add complex seatunnel datatype
* implemente the complex type converter
---
.../apache/seatunnel/api/table/catalog/Column.java | 24 +--
.../table/connector/SupportReadingMetadata.java | 4 +-
.../apache/seatunnel/api/table/type/ArrayType.java | 15 ++
.../apache/seatunnel/api/table/type/BasicType.java | 48 +++++-
.../apache/seatunnel/api/table/type/DataType.java | 8 +-
.../apache/seatunnel/api/table/type/EnumType.java | 13 ++
.../apache/seatunnel/api/table/type/ListType.java | 12 +-
.../apache/seatunnel/api/table/type/MapType.java | 29 ++++
.../apache/seatunnel/api/table/type/PojoType.java | 25 ++-
.../seatunnel/api/table/type/TimestampType.java | 16 ++
.../flink/types/ArrayTypeConverter.java | 60 ++++++++
.../flink/types/BasicTypeConverter.java | 64 +++++---
.../flink/types/FlinkTypeConverter.java | 7 +-
.../translation/flink/types/PojoTypeConverter.java | 50 ++++++
.../flink/types/TimestampTypeConverter.java | 16 +-
.../flink/utils/TypeConverterUtils.java | 170 +++++++++++++++++++++
16 files changed, 492 insertions(+), 69 deletions(-)
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
index 01d2a287..39304916 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
@@ -34,11 +34,11 @@ public abstract class Column {
/**
* Data type of the column.
*/
- protected final DataType dataType;
+ protected final DataType<?> dataType;
protected final String comment;
- private Column(String name, DataType dataType, String comment) {
+ private Column(String name, DataType<?> dataType, String comment) {
this.name = name;
this.dataType = dataType;
this.comment = comment;
@@ -47,7 +47,7 @@ public abstract class Column {
/**
* Creates a regular table column that represents physical data.
*/
- public static PhysicalColumn physical(String name, DataType dataType) {
+ public static PhysicalColumn physical(String name, DataType<?> dataType) {
return new PhysicalColumn(name, dataType);
}
@@ -58,7 +58,7 @@ public abstract class Column {
* <p>Allows to specify whether the column is virtual or not.
*/
public static MetadataColumn metadata(
- String name, DataType dataType, String metadataKey) {
+ String name, DataType<?> dataType, String metadataKey) {
return new MetadataColumn(name, dataType, metadataKey);
}
@@ -76,7 +76,7 @@ public abstract class Column {
/**
* Returns the data type of this column.
*/
- public DataType getDataType() {
+ public DataType<?> getDataType() {
return this.dataType;
}
@@ -97,7 +97,7 @@ public abstract class Column {
/**
* Returns a copy of the column with a replaced {@link DataType}.
*/
- public abstract Column copy(DataType newType);
+ public abstract Column copy(DataType<?> newType);
@Override
public boolean equals(Object o) {
@@ -127,11 +127,11 @@ public abstract class Column {
*/
public static final class PhysicalColumn extends Column {
- private PhysicalColumn(String name, DataType dataType) {
+ private PhysicalColumn(String name, DataType<?> dataType) {
this(name, dataType, null);
}
- private PhysicalColumn(String name, DataType dataType, String comment) {
+ private PhysicalColumn(String name, DataType<?> dataType, String comment) {
super(name, dataType, comment);
}
@@ -149,7 +149,7 @@ public abstract class Column {
}
@Override
- public Column copy(DataType newDataType) {
+ public Column copy(DataType<?> newDataType) {
return new PhysicalColumn(name, newDataType, comment);
}
}
@@ -162,13 +162,13 @@ public abstract class Column {
private final String metadataKey;
private MetadataColumn(
- String name, DataType dataType, String metadataKey) {
+ String name, DataType<?> dataType, String metadataKey) {
this(name, dataType, metadataKey, null);
}
private MetadataColumn(
String name,
- DataType dataType,
+ DataType<?> dataType,
String metadataKey,
String comment) {
super(name, dataType, comment);
@@ -193,7 +193,7 @@ public abstract class Column {
}
@Override
- public Column copy(DataType newDataType) {
+ public Column copy(DataType<?> newDataType) {
return new MetadataColumn(name, newDataType, metadataKey, comment);
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/SupportReadingMetadata.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/SupportReadingMetadata.java
index 3fc90e17..66188580 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/SupportReadingMetadata.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/SupportReadingMetadata.java
@@ -28,7 +28,7 @@ import java.util.Map;
*/
public interface SupportReadingMetadata {
- Map<String, DataType> listReadableMetadata(CatalogTable catalogTable);
+ Map<String, DataType<?>> listReadableMetadata(CatalogTable catalogTable);
- void applyReadableMetadata(CatalogTable catalogTable, List<String> metadataKeys, DataType dataType);
+ void applyReadableMetadata(CatalogTable catalogTable, List<String> metadataKeys, DataType<?> dataType);
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java
new file mode 100644
index 00000000..1e24cd25
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java
@@ -0,0 +1,15 @@
+package org.apache.seatunnel.api.table.type;
+
+public class ArrayType<T> implements DataType<T> {
+
+ private final BasicType<T> elementType;
+
+ public ArrayType(BasicType<T> elementType) {
+ this.elementType = elementType;
+ }
+
+ public BasicType<T> getElementType() {
+ return elementType;
+ }
+
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java
index 13489f22..c2834f7e 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java
@@ -17,6 +17,9 @@
package org.apache.seatunnel.api.table.type;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.time.Instant;
import java.util.Date;
public class BasicType<T> implements DataType<T> {
@@ -29,19 +32,50 @@ public class BasicType<T> implements DataType<T> {
public static final BasicType<Long> LONG = new BasicType<>(Long.class);
public static final BasicType<Float> FLOAT = new BasicType<>(Float.class);
public static final BasicType<Byte> BYTE = new BasicType<>(Byte.class);
+ public static final BasicType<Short> SHORT = new BasicType<>(Short.class);
+ public static final BasicType<Character> CHARACTER = new BasicType<>(Character.class);
+ public static final BasicType<BigInteger> BIG_INTEGER = new BasicType<>(BigInteger.class);
+ public static final BasicType<BigDecimal> BIG_DECIMAL = new BasicType<>(BigDecimal.class);
+ public static final BasicType<Instant> INSTANT = new BasicType<>(Instant.class);
public static final BasicType<Void> NULL = new BasicType<>(Void.class);
- private final Class<T> typeClass;
+ /**
+ * The physical type class.
+ */
+ private final Class<T> physicalTypeClass;
- public BasicType(Class<T> typeClass) {
- if (typeClass == null) {
- throw new IllegalArgumentException("typeClass cannot be null");
+ public BasicType(Class<T> physicalTypeClass) {
+ if (physicalTypeClass == null) {
+ throw new IllegalArgumentException("physicalTypeClass cannot be null");
}
- this.typeClass = typeClass;
+ this.physicalTypeClass = physicalTypeClass;
+ }
+
+ public Class<T> getPhysicalTypeClass() {
+ return this.physicalTypeClass;
+ }
+
+ @Override
+ public int hashCode() {
+ return this.physicalTypeClass.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ BasicType<?> other = (BasicType<?>) obj;
+ return this.physicalTypeClass.equals(other.physicalTypeClass);
}
@Override
- public Class<T> getTypeClass() {
- return this.typeClass;
+ public String toString() {
+ return "BasicType{" +
+ "physicalTypeClass=" + physicalTypeClass +
+ '}';
}
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java
index 0c65f471..2183ad18 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java
@@ -18,15 +18,9 @@
package org.apache.seatunnel.api.table.type;
/**
- * Data type of column in SeaTunnel.
+ * Logic data type of column in SeaTunnel.
*/
public interface DataType<T> {
- /**
- * The type class.
- *
- * @return the type class.
- */
- Class<T> getTypeClass();
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/EnumType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/EnumType.java
new file mode 100644
index 00000000..9b4f8517
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/EnumType.java
@@ -0,0 +1,13 @@
+package org.apache.seatunnel.api.table.type;
+
+public class EnumType<T extends Enum<T>> implements DataType<T> {
+ private final Class<T> enumClass;
+
+ public EnumType(Class<T> enumClass) {
+ this.enumClass = enumClass;
+ }
+
+ public Class<T> getEnumClass() {
+ return enumClass;
+ }
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ListType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ListType.java
index 54ccad0b..41c57004 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ListType.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ListType.java
@@ -19,15 +19,13 @@ package org.apache.seatunnel.api.table.type;
public class ListType<T> implements DataType<T> {
- // todo: use DataType?
- private final Class<T> typeClass;
+ private final DataType<T> elementType;
- public ListType(Class<T> typeClass) {
- this.typeClass = typeClass;
+ public ListType(DataType<T> elementType) {
+ this.elementType = elementType;
}
- @Override
- public Class<T> getTypeClass() {
- return typeClass;
+ public DataType<T> getElementType() {
+ return elementType;
}
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MapType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MapType.java
new file mode 100644
index 00000000..6c8c24b7
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MapType.java
@@ -0,0 +1,29 @@
+package org.apache.seatunnel.api.table.type;
+
+import java.util.Map;
+
+public class MapType<K, V> implements DataType<Map<K, V>> {
+
+ private final DataType<K> keyType;
+ private final DataType<V> valueType;
+
+ public MapType(DataType<K> keyType, DataType<V> valueType) {
+ if (keyType == null) {
+ throw new IllegalArgumentException("keyType cannot be null");
+ }
+ if (valueType == null) {
+ throw new IllegalArgumentException("valueType cannot be null");
+ }
+ this.keyType = keyType;
+ this.valueType = valueType;
+ }
+
+ public DataType<K> getKeyType() {
+ return keyType;
+ }
+
+ public DataType<V> getValueType() {
+ return valueType;
+ }
+
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/PojoType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/PojoType.java
index e70fcd62..2892d011 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/PojoType.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/PojoType.java
@@ -17,18 +17,29 @@
package org.apache.seatunnel.api.table.type;
-// todo: we may don't need to pojo type
+import java.lang.reflect.Field;
+
public class PojoType<T> implements DataType<T> {
- private final Class<T> typeClass;
+ private final Class<T> pojoClass;
+ private final Field[] fields;
+ private final DataType<?>[] fieldTypes;
+
+ public PojoType(Class<T> pojoClass, Field[] fields, DataType<?>[] fieldTypes) {
+ this.pojoClass = pojoClass;
+ this.fields = fields;
+ this.fieldTypes = fieldTypes;
+ }
- public PojoType(Class<T> typeClass) {
- this.typeClass = typeClass;
+ public Class<T> getPojoClass() {
+ return pojoClass;
+ }
+ public Field[] getFields() {
+ return fields;
}
- @Override
- public Class<T> getTypeClass() {
- return typeClass;
+ public DataType<?>[] getFieldTypes() {
+ return fieldTypes;
}
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/TimestampType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/TimestampType.java
new file mode 100644
index 00000000..390ce453
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/TimestampType.java
@@ -0,0 +1,16 @@
+package org.apache.seatunnel.api.table.type;
+
+import java.sql.Timestamp;
+
+public class TimestampType implements DataType<Timestamp> {
+
+ private final int precision;
+
+ public TimestampType(int precision) {
+ this.precision = precision;
+ }
+
+ public int getPrecision() {
+ return precision;
+ }
+}
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/ArrayTypeConverter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/ArrayTypeConverter.java
new file mode 100644
index 00000000..d4aa1866
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/ArrayTypeConverter.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.types;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+
+public class ArrayTypeConverter<T1, T2> implements FlinkTypeConverter<ArrayType<T1>, BasicArrayTypeInfo<T1, T2>> {
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public BasicArrayTypeInfo<T1, T2> convert(ArrayType<T1> arrayType) {
+ BasicType<T1> elementType = arrayType.getElementType();
+ if (BasicType.BOOLEAN.equals(elementType)) {
+ return (BasicArrayTypeInfo<T1, T2>) BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO;
+ }
+ if (BasicType.STRING.equals(elementType)) {
+ return (BasicArrayTypeInfo<T1, T2>) BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO;
+ }
+ if (BasicType.DOUBLE.equals(elementType)) {
+ return (BasicArrayTypeInfo<T1, T2>) BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO;
+ }
+ if (BasicType.INTEGER.equals(elementType)) {
+ return (BasicArrayTypeInfo<T1, T2>) BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO;
+ }
+ if (BasicType.LONG.equals(elementType)) {
+ return (BasicArrayTypeInfo<T1, T2>) BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO;
+ }
+ if (BasicType.FLOAT.equals(elementType)) {
+ return (BasicArrayTypeInfo<T1, T2>) BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO;
+ }
+ if (BasicType.BYTE.equals(elementType)) {
+ return (BasicArrayTypeInfo<T1, T2>) BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO;
+ }
+ if (BasicType.SHORT.equals(elementType)) {
+ return (BasicArrayTypeInfo<T1, T2>) BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO;
+ }
+ if (BasicType.CHARACTER.equals(elementType)) {
+ return (BasicArrayTypeInfo<T1, T2>) BasicArrayTypeInfo.CHAR_ARRAY_TYPE_INFO;
+ }
+ throw new IllegalArgumentException("Unsupported basic type: " + elementType);
+ }
+}
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/BasicTypeConverter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/BasicTypeConverter.java
index 7a2ee1ff..ed8967f8 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/BasicTypeConverter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/BasicTypeConverter.java
@@ -18,70 +18,98 @@
package org.apache.seatunnel.translation.flink.types;
import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.DataType;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.time.Instant;
import java.util.Date;
-public class BasicTypeConverter<T1, T2> implements FlinkTypeConverter<T1, T2> {
+public class BasicTypeConverter<T1>
+ implements FlinkTypeConverter<BasicType<T1>, TypeInformation<T1>> {
- public static final BasicTypeConverter<String, String> STRING_CONVERTER =
+ public static final BasicTypeConverter<String> STRING_CONVERTER =
new BasicTypeConverter<>(
BasicType.STRING,
BasicTypeInfo.STRING_TYPE_INFO);
- public static final BasicTypeConverter<Integer, Integer> INTEGER_CONVERTER =
+ public static final BasicTypeConverter<Integer> INTEGER_CONVERTER =
new BasicTypeConverter<>(
BasicType.INTEGER,
BasicTypeInfo.INT_TYPE_INFO);
- public static final BasicTypeConverter<Boolean, Boolean> BOOLEAN_CONVERTER =
+ public static final BasicTypeConverter<Boolean> BOOLEAN_CONVERTER =
new BasicTypeConverter<>(
BasicType.BOOLEAN,
BasicTypeInfo.BOOLEAN_TYPE_INFO);
- public static final BasicTypeConverter<Date, Date> DATE_CONVERTER =
+ public static final BasicTypeConverter<Date> DATE_CONVERTER =
new BasicTypeConverter<>(
BasicType.DATE,
BasicTypeInfo.DATE_TYPE_INFO);
- public static final BasicTypeConverter<Double, Double> DOUBLE_CONVERTER =
+ public static final BasicTypeConverter<Double> DOUBLE_CONVERTER =
new BasicTypeConverter<>(
BasicType.DOUBLE,
BasicTypeInfo.DOUBLE_TYPE_INFO);
- public static final BasicTypeConverter<Long, Long> LONG_CONVERTER =
+ public static final BasicTypeConverter<Long> LONG_CONVERTER =
new BasicTypeConverter<>(
BasicType.LONG,
BasicTypeInfo.LONG_TYPE_INFO);
- public static final BasicTypeConverter<Float, Float> FLOAT_CONVERTER =
+ public static final BasicTypeConverter<Float> FLOAT_CONVERTER =
new BasicTypeConverter<>(
BasicType.FLOAT,
BasicTypeInfo.FLOAT_TYPE_INFO);
- public static final BasicTypeConverter<Byte, Byte> BYTE_CONVERTER =
+ public static final BasicTypeConverter<Byte> BYTE_CONVERTER =
new BasicTypeConverter<>(
BasicType.BYTE,
BasicTypeInfo.BYTE_TYPE_INFO);
- public static final BasicTypeConverter<Void, Void> NULL_CONVERTER =
+ public static final BasicTypeConverter<Short> SHORT_CONVERTER =
+ new BasicTypeConverter<>(
+ BasicType.SHORT,
+ BasicTypeInfo.SHORT_TYPE_INFO);
+
+ public static final BasicTypeConverter<Character> CHARACTER_CONVERTER =
+ new BasicTypeConverter<>(
+ BasicType.CHARACTER,
+ BasicTypeInfo.CHAR_TYPE_INFO);
+
+ public static final BasicTypeConverter<BigInteger> BIG_INTEGER_CONVERTER =
+ new BasicTypeConverter<>(
+ BasicType.BIG_INTEGER,
+ BasicTypeInfo.BIG_INT_TYPE_INFO);
+
+ public static final BasicTypeConverter<BigDecimal> BIG_DECIMAL =
+ new BasicTypeConverter<>(
+ BasicType.BIG_DECIMAL,
+ BasicTypeInfo.BIG_DEC_TYPE_INFO);
+
+ public static final BasicTypeConverter<Instant> INSTANT_CONVERTER =
+ new BasicTypeConverter<>(
+ BasicType.INSTANT,
+ BasicTypeInfo.INSTANT_TYPE_INFO);
+
+ public static final BasicTypeConverter<Void> NULL_CONVERTER =
new BasicTypeConverter<>(
BasicType.NULL,
BasicTypeInfo.VOID_TYPE_INFO);
- private final DataType<T1> dataType;
- private final TypeInformation<T2> typeInformation;
+ private final BasicType<T1> seaTunnelDataType;
+ private final TypeInformation<T1> flinkTypeInformation;
- public BasicTypeConverter(DataType<T1> dataType, TypeInformation<T2> typeInformation) {
- this.dataType = dataType;
- this.typeInformation = typeInformation;
+ public BasicTypeConverter(BasicType<T1> seaTunnelDataType, TypeInformation<T1> flinkTypeInformation) {
+ this.seaTunnelDataType = seaTunnelDataType;
+ this.flinkTypeInformation = flinkTypeInformation;
}
@Override
- public TypeInformation<T2> convert(DataType<T1> dataType) {
- return typeInformation;
+ public TypeInformation<T1> convert(BasicType<T1> seaTunnelDataType) {
+ return flinkTypeInformation;
}
}
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
index 78e6d5ab..f5912f36 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
@@ -25,16 +25,15 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
/**
* Convert SeaTunnel {@link DataType} to flink type.
*/
-public interface FlinkTypeConverter<T1, T2>
- extends Converter<DataType<T1>, TypeInformation<T2>> {
+public interface FlinkTypeConverter<T1, T2> extends Converter<T1, T2> {
/**
* Convert SeaTunnel {@link DataType} to flink {@link TypeInformation}.
*
- * @param dataType SeaTunnel {@link DataType}
+ * @param seaTunnelDataType SeaTunnel {@link DataType}
* @return flink {@link TypeInformation}
*/
@Override
- TypeInformation<T2> convert(DataType<T1> dataType);
+ T2 convert(T1 seaTunnelDataType);
}
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/PojoTypeConverter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/PojoTypeConverter.java
new file mode 100644
index 00000000..efd6e713
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/PojoTypeConverter.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.types;
+
+import org.apache.seatunnel.api.table.type.DataType;
+import org.apache.seatunnel.api.table.type.PojoType;
+import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class PojoTypeConverter<T1> implements FlinkTypeConverter<PojoType<T1>, PojoTypeInfo<T1>> {
+
+ @Override
+ public PojoTypeInfo<T1> convert(PojoType<T1> seaTunnelDataType) {
+ Class<T1> pojoClass = seaTunnelDataType.getPojoClass();
+ Field[] fields = seaTunnelDataType.getFields();
+ DataType<?>[] fieldTypes = seaTunnelDataType.getFieldTypes();
+ if (ArrayUtils.isEmpty(fields)) {
+ return new PojoTypeInfo<>(pojoClass, Collections.emptyList());
+ }
+ List<PojoField> pojoFieldList = new ArrayList<>(fields.length);
+ for (int i = 0; i < fields.length; i++) {
+ PojoField pojoField = new PojoField(fields[i], TypeConverterUtils.convertType(fieldTypes[i]));
+ pojoFieldList.add(pojoField);
+ }
+ return new PojoTypeInfo<>(pojoClass, pojoFieldList);
+ }
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DateTimeType.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/TimestampTypeConverter.java
similarity index 59%
rename from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DateTimeType.java
rename to seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/TimestampTypeConverter.java
index 0dfdacf4..b8bf1be7 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DateTimeType.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/TimestampTypeConverter.java
@@ -15,16 +15,22 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.table.type;
+package org.apache.seatunnel.translation.flink.types;
-public class DateTimeType implements DataType {
+import org.apache.seatunnel.api.table.type.TimestampType;
- public DateTimeType() {
+import org.apache.flink.table.runtime.typeutils.TimestampDataTypeInfo;
+
+public class TimestampTypeConverter implements FlinkTypeConverter<TimestampType, TimestampDataTypeInfo> {
+
+ public static final TimestampTypeConverter INSTANCE = new TimestampTypeConverter();
+
+ private TimestampTypeConverter() {
}
@Override
- public Class getTypeClass() {
- return null;
+ public TimestampDataTypeInfo convert(TimestampType seaTunnelDataType) {
+ return new TimestampDataTypeInfo(seaTunnelDataType.getPrecision());
}
}
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
new file mode 100644
index 00000000..ae155306
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.utils;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DataType;
+import org.apache.seatunnel.api.table.type.EnumType;
+import org.apache.seatunnel.api.table.type.ListType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PojoType;
+import org.apache.seatunnel.api.table.type.TimestampType;
+import org.apache.seatunnel.translation.flink.types.ArrayTypeConverter;
+import org.apache.seatunnel.translation.flink.types.BasicTypeConverter;
+import org.apache.seatunnel.translation.flink.types.PojoTypeConverter;
+import org.apache.seatunnel.translation.flink.types.TimestampTypeConverter;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.Date;
+
+public class TypeConverterUtils {
+
+ @SuppressWarnings("unchecked")
+ public static <T1, T2> TypeInformation<T2> convertType(DataType<T1> dataType) {
+ if (dataType instanceof BasicType) {
+ return (TypeInformation<T2>) convertBasicType((BasicType<T1>) dataType);
+ }
+ if (dataType instanceof TimestampType) {
+ return (TypeInformation<T2>)
+ TimestampTypeConverter.INSTANCE.convert((TimestampType) dataType);
+ }
+ if (dataType instanceof ArrayType) {
+ return (TypeInformation<T2>) convertArrayType((ArrayType<T1>) dataType);
+ }
+ if (dataType instanceof ListType) {
+ return (TypeInformation<T2>) covertListType((ListType<T1>) dataType);
+ }
+ if (dataType instanceof EnumType) {
+
+ return (TypeInformation<T2>) coverEnumType((EnumType<?>) dataType);
+ }
+ if (dataType instanceof MapType) {
+ return (TypeInformation<T2>) convertMapType((MapType<?, ?>) dataType);
+ }
+ if (dataType instanceof PojoType) {
+ return (TypeInformation<T2>) convertPojoType((PojoType<T1>) dataType);
+ }
+ throw new IllegalArgumentException("Unsupported data type: " + dataType);
+ }
+
+ @SuppressWarnings("unchecked")
+ public static <T> TypeInformation<T> convertBasicType(BasicType<T> basicType) {
+ Class<T> physicalTypeClass = basicType.getPhysicalTypeClass();
+ if (physicalTypeClass == Boolean.class) {
+ BasicType<Boolean> booleanBasicType = (BasicType<Boolean>) basicType;
+ return (TypeInformation<T>)
+ BasicTypeConverter.BOOLEAN_CONVERTER.convert(booleanBasicType);
+ }
+ if (physicalTypeClass == String.class) {
+ BasicType<String> stringBasicType = (BasicType<String>) basicType;
+ return (TypeInformation<T>)
+ BasicTypeConverter.STRING_CONVERTER.convert(stringBasicType);
+ }
+ if (physicalTypeClass == Date.class) {
+ BasicType<Date> dateBasicType = (BasicType<Date>) basicType;
+ return (TypeInformation<T>)
+ BasicTypeConverter.DATE_CONVERTER.convert(dateBasicType);
+ }
+ if (physicalTypeClass == Double.class) {
+ BasicType<Double> doubleBasicType = (BasicType<Double>) basicType;
+ return (TypeInformation<T>)
+ BasicTypeConverter.DOUBLE_CONVERTER.convert(doubleBasicType);
+ }
+ if (physicalTypeClass == Integer.class) {
+ BasicType<Integer> integerBasicType = (BasicType<Integer>) basicType;
+ return (TypeInformation<T>)
+ BasicTypeConverter.INTEGER_CONVERTER.convert(integerBasicType);
+ }
+ if (physicalTypeClass == Long.class) {
+ BasicType<Long> longBasicType = (BasicType<Long>) basicType;
+ return (TypeInformation<T>)
+ BasicTypeConverter.LONG_CONVERTER.convert(longBasicType);
+ }
+ if (physicalTypeClass == Float.class) {
+ BasicType<Float> floatBasicType = (BasicType<Float>) basicType;
+ return (TypeInformation<T>)
+ BasicTypeConverter.FLOAT_CONVERTER.convert(floatBasicType);
+ }
+ if (physicalTypeClass == Byte.class) {
+ BasicType<Byte> byteBasicType = (BasicType<Byte>) basicType;
+ return (TypeInformation<T>)
+ BasicTypeConverter.BYTE_CONVERTER.convert(byteBasicType);
+ }
+ if (physicalTypeClass == Short.class) {
+ BasicType<Short> shortBasicType = (BasicType<Short>) basicType;
+ return (TypeInformation<T>)
+ BasicTypeConverter.SHORT_CONVERTER.convert(shortBasicType);
+ }
+ if (physicalTypeClass == Character.class) {
+ BasicType<Character> characterBasicType = (BasicType<Character>) basicType;
+ return (TypeInformation<T>)
+ BasicTypeConverter.CHARACTER_CONVERTER.convert(characterBasicType);
+ }
+ if (physicalTypeClass == BigInteger.class) {
+ BasicType<BigInteger> bigIntegerBasicType = (BasicType<BigInteger>) basicType;
+ return (TypeInformation<T>)
+ BasicTypeConverter.BIG_INTEGER_CONVERTER.convert(bigIntegerBasicType);
+ }
+ if (physicalTypeClass == BigDecimal.class) {
+ BasicType<BigDecimal> bigDecimalBasicType = (BasicType<BigDecimal>) basicType;
+ return (TypeInformation<T>)
+ BasicTypeConverter.BIG_DECIMAL.convert(bigDecimalBasicType);
+ }
+ if (physicalTypeClass == Void.class) {
+ BasicType<Void> voidBasicType = (BasicType<Void>) basicType;
+ return (TypeInformation<T>)
+ BasicTypeConverter.NULL_CONVERTER.convert(voidBasicType);
+ }
+ throw new IllegalArgumentException("Unsupported basic type: " + basicType);
+ }
+
+ public static <T1, T2> BasicArrayTypeInfo<T1, T2> convertArrayType(ArrayType<T1> arrayType) {
+ ArrayTypeConverter<T1, T2> arrayTypeConverter = new ArrayTypeConverter<>();
+ return arrayTypeConverter.convert(arrayType);
+ }
+
+ public static <T> ListTypeInfo<T> covertListType(ListType<T> listType) {
+ DataType<T> elementType = listType.getElementType();
+ return new ListTypeInfo<>(convertType(elementType));
+ }
+
+ public static <T extends Enum<T>> EnumTypeInfo<T> coverEnumType(EnumType<T> enumType) {
+ Class<T> enumClass = enumType.getEnumClass();
+ return new EnumTypeInfo<>(enumClass);
+ }
+
+ public static <K, V> MapTypeInfo<K, V> convertMapType(MapType<K, V> mapType) {
+ DataType<K> keyType = mapType.getKeyType();
+ DataType<V> valueType = mapType.getValueType();
+ return new MapTypeInfo<>(convertType(keyType), convertType(valueType));
+ }
+
+ public static <T> PojoTypeInfo<T> convertPojoType(PojoType<T> pojoType) {
+ PojoTypeConverter<T> pojoTypeConverter = new PojoTypeConverter<>();
+ return pojoTypeConverter.convert(pojoType);
+ }
+}