You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/06 15:12:24 UTC

[incubator-seatunnel] branch api-draft updated: Add complex seatunnel datatype (#1807)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new 3753ed3e Add complex seatunnel datatype (#1807)
3753ed3e is described below

commit 3753ed3ef91c64dc8766fa1bec6894f77dc5fb3d
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Fri May 6 23:12:19 2022 +0800

    Add complex seatunnel datatype (#1807)
    
    * Add complex seatunnel datatype
    * implemente the complex type converter
---
 .../apache/seatunnel/api/table/catalog/Column.java |  24 +--
 .../table/connector/SupportReadingMetadata.java    |   4 +-
 .../apache/seatunnel/api/table/type/ArrayType.java |  15 ++
 .../apache/seatunnel/api/table/type/BasicType.java |  48 +++++-
 .../apache/seatunnel/api/table/type/DataType.java  |   8 +-
 .../apache/seatunnel/api/table/type/EnumType.java  |  13 ++
 .../apache/seatunnel/api/table/type/ListType.java  |  12 +-
 .../apache/seatunnel/api/table/type/MapType.java   |  29 ++++
 .../apache/seatunnel/api/table/type/PojoType.java  |  25 ++-
 .../seatunnel/api/table/type/TimestampType.java    |  16 ++
 .../flink/types/ArrayTypeConverter.java            |  60 ++++++++
 .../flink/types/BasicTypeConverter.java            |  64 +++++---
 .../flink/types/FlinkTypeConverter.java            |   7 +-
 .../translation/flink/types/PojoTypeConverter.java |  50 ++++++
 .../flink/types/TimestampTypeConverter.java        |  16 +-
 .../flink/utils/TypeConverterUtils.java            | 170 +++++++++++++++++++++
 16 files changed, 492 insertions(+), 69 deletions(-)

diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
index 01d2a287..39304916 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
@@ -34,11 +34,11 @@ public abstract class Column {
     /**
      * Data type of the column.
      */
-    protected final DataType dataType;
+    protected final DataType<?> dataType;
 
     protected final String comment;
 
-    private Column(String name, DataType dataType, String comment) {
+    private Column(String name, DataType<?> dataType, String comment) {
         this.name = name;
         this.dataType = dataType;
         this.comment = comment;
@@ -47,7 +47,7 @@ public abstract class Column {
     /**
      * Creates a regular table column that represents physical data.
      */
-    public static PhysicalColumn physical(String name, DataType dataType) {
+    public static PhysicalColumn physical(String name, DataType<?> dataType) {
         return new PhysicalColumn(name, dataType);
     }
 
@@ -58,7 +58,7 @@ public abstract class Column {
      * <p>Allows to specify whether the column is virtual or not.
      */
     public static MetadataColumn metadata(
-            String name, DataType dataType, String metadataKey) {
+            String name, DataType<?> dataType, String metadataKey) {
         return new MetadataColumn(name, dataType, metadataKey);
     }
 
@@ -76,7 +76,7 @@ public abstract class Column {
     /**
      * Returns the data type of this column.
      */
-    public DataType getDataType() {
+    public DataType<?> getDataType() {
         return this.dataType;
     }
 
@@ -97,7 +97,7 @@ public abstract class Column {
     /**
      * Returns a copy of the column with a replaced {@link DataType}.
      */
-    public abstract Column copy(DataType newType);
+    public abstract Column copy(DataType<?> newType);
 
     @Override
     public boolean equals(Object o) {
@@ -127,11 +127,11 @@ public abstract class Column {
      */
     public static final class PhysicalColumn extends Column {
 
-        private PhysicalColumn(String name, DataType dataType) {
+        private PhysicalColumn(String name, DataType<?> dataType) {
             this(name, dataType, null);
         }
 
-        private PhysicalColumn(String name, DataType dataType, String comment) {
+        private PhysicalColumn(String name, DataType<?> dataType, String comment) {
             super(name, dataType, comment);
         }
 
@@ -149,7 +149,7 @@ public abstract class Column {
         }
 
         @Override
-        public Column copy(DataType newDataType) {
+        public Column copy(DataType<?> newDataType) {
             return new PhysicalColumn(name, newDataType, comment);
         }
     }
@@ -162,13 +162,13 @@ public abstract class Column {
         private final String metadataKey;
 
         private MetadataColumn(
-                String name, DataType dataType, String metadataKey) {
+                String name, DataType<?> dataType, String metadataKey) {
             this(name, dataType, metadataKey, null);
         }
 
         private MetadataColumn(
                 String name,
-                DataType dataType,
+                DataType<?> dataType,
                 String metadataKey,
                 String comment) {
             super(name, dataType, comment);
@@ -193,7 +193,7 @@ public abstract class Column {
         }
 
         @Override
-        public Column copy(DataType newDataType) {
+        public Column copy(DataType<?> newDataType) {
             return new MetadataColumn(name, newDataType, metadataKey, comment);
         }
 
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/SupportReadingMetadata.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/SupportReadingMetadata.java
index 3fc90e17..66188580 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/SupportReadingMetadata.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/SupportReadingMetadata.java
@@ -28,7 +28,7 @@ import java.util.Map;
  */
 public interface SupportReadingMetadata {
 
-    Map<String, DataType> listReadableMetadata(CatalogTable catalogTable);
+    Map<String, DataType<?>> listReadableMetadata(CatalogTable catalogTable);
 
-    void applyReadableMetadata(CatalogTable catalogTable, List<String> metadataKeys, DataType dataType);
+    void applyReadableMetadata(CatalogTable catalogTable, List<String> metadataKeys, DataType<?> dataType);
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java
new file mode 100644
index 00000000..1e24cd25
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java
@@ -0,0 +1,15 @@
+package org.apache.seatunnel.api.table.type;
+
+public class ArrayType<T> implements DataType<T> {
+
+    private final BasicType<T> elementType;
+
+    public ArrayType(BasicType<T> elementType) {
+        this.elementType = elementType;
+    }
+
+    public BasicType<T> getElementType() {
+        return elementType;
+    }
+
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java
index 13489f22..c2834f7e 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java
@@ -17,6 +17,9 @@
 
 package org.apache.seatunnel.api.table.type;
 
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.time.Instant;
 import java.util.Date;
 
 public class BasicType<T> implements DataType<T> {
@@ -29,19 +32,50 @@ public class BasicType<T> implements DataType<T> {
     public static final BasicType<Long> LONG = new BasicType<>(Long.class);
     public static final BasicType<Float> FLOAT = new BasicType<>(Float.class);
     public static final BasicType<Byte> BYTE = new BasicType<>(Byte.class);
+    public static final BasicType<Short> SHORT = new BasicType<>(Short.class);
+    public static final BasicType<Character> CHARACTER = new BasicType<>(Character.class);
+    public static final BasicType<BigInteger> BIG_INTEGER = new BasicType<>(BigInteger.class);
+    public static final BasicType<BigDecimal> BIG_DECIMAL = new BasicType<>(BigDecimal.class);
+    public static final BasicType<Instant> INSTANT = new BasicType<>(Instant.class);
     public static final BasicType<Void> NULL = new BasicType<>(Void.class);
 
-    private final Class<T> typeClass;
+    /**
+     * The physical type class.
+     */
+    private final Class<T> physicalTypeClass;
 
-    public BasicType(Class<T> typeClass) {
-        if (typeClass == null) {
-            throw new IllegalArgumentException("typeClass cannot be null");
+    public BasicType(Class<T> physicalTypeClass) {
+        if (physicalTypeClass == null) {
+            throw new IllegalArgumentException("physicalTypeClass cannot be null");
         }
-        this.typeClass = typeClass;
+        this.physicalTypeClass = physicalTypeClass;
+    }
+
+    public Class<T> getPhysicalTypeClass() {
+        return this.physicalTypeClass;
+    }
+
+    @Override
+    public int hashCode() {
+        return this.physicalTypeClass.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        BasicType<?> other = (BasicType<?>) obj;
+        return this.physicalTypeClass.equals(other.physicalTypeClass);
     }
 
     @Override
-    public Class<T> getTypeClass() {
-        return this.typeClass;
+    public String toString() {
+        return "BasicType{" +
+            "physicalTypeClass=" + physicalTypeClass +
+            '}';
     }
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java
index 0c65f471..2183ad18 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java
@@ -18,15 +18,9 @@
 package org.apache.seatunnel.api.table.type;
 
 /**
- * Data type of column in SeaTunnel.
+ * Logic data type of column in SeaTunnel.
  */
 public interface DataType<T> {
 
-    /**
-     * The type class.
-     *
-     * @return the type class.
-     */
-    Class<T> getTypeClass();
 
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/EnumType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/EnumType.java
new file mode 100644
index 00000000..9b4f8517
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/EnumType.java
@@ -0,0 +1,13 @@
+package org.apache.seatunnel.api.table.type;
+
+public class EnumType<T extends Enum<T>> implements DataType<T> {
+    private final Class<T> enumClass;
+
+    public EnumType(Class<T> enumClass) {
+        this.enumClass = enumClass;
+    }
+
+    public Class<T> getEnumClass() {
+        return enumClass;
+    }
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ListType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ListType.java
index 54ccad0b..41c57004 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ListType.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ListType.java
@@ -19,15 +19,13 @@ package org.apache.seatunnel.api.table.type;
 
 public class ListType<T> implements DataType<T> {
 
-    // todo: use DataType?
-    private final Class<T> typeClass;
+    private final DataType<T> elementType;
 
-    public ListType(Class<T> typeClass) {
-        this.typeClass = typeClass;
+    public ListType(DataType<T> elementType) {
+        this.elementType = elementType;
     }
 
-    @Override
-    public Class<T> getTypeClass() {
-        return typeClass;
+    public DataType<T> getElementType() {
+        return elementType;
     }
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MapType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MapType.java
new file mode 100644
index 00000000..6c8c24b7
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MapType.java
@@ -0,0 +1,29 @@
+package org.apache.seatunnel.api.table.type;
+
+import java.util.Map;
+
+public class MapType<K, V> implements DataType<Map<K, V>> {
+
+    private final DataType<K> keyType;
+    private final DataType<V> valueType;
+
+    public MapType(DataType<K> keyType, DataType<V> valueType) {
+        if (keyType == null) {
+            throw new IllegalArgumentException("keyType cannot be null");
+        }
+        if (valueType == null) {
+            throw new IllegalArgumentException("valueType cannot be null");
+        }
+        this.keyType = keyType;
+        this.valueType = valueType;
+    }
+
+    public DataType<K> getKeyType() {
+        return keyType;
+    }
+
+    public DataType<V> getValueType() {
+        return valueType;
+    }
+
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/PojoType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/PojoType.java
index e70fcd62..2892d011 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/PojoType.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/PojoType.java
@@ -17,18 +17,29 @@
 
 package org.apache.seatunnel.api.table.type;
 
-// todo: we may don't need to pojo type
+import java.lang.reflect.Field;
+
 public class PojoType<T> implements DataType<T> {
 
-    private final Class<T> typeClass;
+    private final Class<T> pojoClass;
+    private final Field[] fields;
+    private final DataType<?>[] fieldTypes;
+
+    public PojoType(Class<T> pojoClass, Field[] fields, DataType<?>[] fieldTypes) {
+        this.pojoClass = pojoClass;
+        this.fields = fields;
+        this.fieldTypes = fieldTypes;
+    }
 
-    public PojoType(Class<T> typeClass) {
-        this.typeClass = typeClass;
+    public Class<T> getPojoClass() {
+        return pojoClass;
+    }
 
+    public Field[] getFields() {
+        return fields;
     }
 
-    @Override
-    public Class<T> getTypeClass() {
-        return typeClass;
+    public DataType<?>[] getFieldTypes() {
+        return fieldTypes;
     }
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/TimestampType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/TimestampType.java
new file mode 100644
index 00000000..390ce453
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/TimestampType.java
@@ -0,0 +1,16 @@
+package org.apache.seatunnel.api.table.type;
+
+import java.sql.Timestamp;
+
+public class TimestampType implements DataType<Timestamp> {
+
+    private final int precision;
+
+    public TimestampType(int precision) {
+        this.precision = precision;
+    }
+
+    public int getPrecision() {
+        return precision;
+    }
+}
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/ArrayTypeConverter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/ArrayTypeConverter.java
new file mode 100644
index 00000000..d4aa1866
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/ArrayTypeConverter.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.types;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+
+public class ArrayTypeConverter<T1, T2> implements FlinkTypeConverter<ArrayType<T1>, BasicArrayTypeInfo<T1, T2>> {
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public BasicArrayTypeInfo<T1, T2> convert(ArrayType<T1> arrayType) {
+        BasicType<T1> elementType = arrayType.getElementType();
+        if (BasicType.BOOLEAN.equals(elementType)) {
+            return (BasicArrayTypeInfo<T1, T2>) BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO;
+        }
+        if (BasicType.STRING.equals(elementType)) {
+            return (BasicArrayTypeInfo<T1, T2>) BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO;
+        }
+        if (BasicType.DOUBLE.equals(elementType)) {
+            return (BasicArrayTypeInfo<T1, T2>) BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO;
+        }
+        if (BasicType.INTEGER.equals(elementType)) {
+            return (BasicArrayTypeInfo<T1, T2>) BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO;
+        }
+        if (BasicType.LONG.equals(elementType)) {
+            return (BasicArrayTypeInfo<T1, T2>) BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO;
+        }
+        if (BasicType.FLOAT.equals(elementType)) {
+            return (BasicArrayTypeInfo<T1, T2>) BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO;
+        }
+        if (BasicType.BYTE.equals(elementType)) {
+            return (BasicArrayTypeInfo<T1, T2>) BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO;
+        }
+        if (BasicType.SHORT.equals(elementType)) {
+            return (BasicArrayTypeInfo<T1, T2>) BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO;
+        }
+        if (BasicType.CHARACTER.equals(elementType)) {
+            return (BasicArrayTypeInfo<T1, T2>) BasicArrayTypeInfo.CHAR_ARRAY_TYPE_INFO;
+        }
+        throw new IllegalArgumentException("Unsupported basic type: " + elementType);
+    }
+}
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/BasicTypeConverter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/BasicTypeConverter.java
index 7a2ee1ff..ed8967f8 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/BasicTypeConverter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/BasicTypeConverter.java
@@ -18,70 +18,98 @@
 package org.apache.seatunnel.translation.flink.types;
 
 import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.DataType;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.time.Instant;
 import java.util.Date;
 
-public class BasicTypeConverter<T1, T2> implements FlinkTypeConverter<T1, T2> {
+public class BasicTypeConverter<T1>
+    implements FlinkTypeConverter<BasicType<T1>, TypeInformation<T1>> {
 
-    public static final BasicTypeConverter<String, String> STRING_CONVERTER =
+    public static final BasicTypeConverter<String> STRING_CONVERTER =
         new BasicTypeConverter<>(
             BasicType.STRING,
             BasicTypeInfo.STRING_TYPE_INFO);
 
-    public static final BasicTypeConverter<Integer, Integer> INTEGER_CONVERTER =
+    public static final BasicTypeConverter<Integer> INTEGER_CONVERTER =
         new BasicTypeConverter<>(
             BasicType.INTEGER,
             BasicTypeInfo.INT_TYPE_INFO);
 
-    public static final BasicTypeConverter<Boolean, Boolean> BOOLEAN_CONVERTER =
+    public static final BasicTypeConverter<Boolean> BOOLEAN_CONVERTER =
         new BasicTypeConverter<>(
             BasicType.BOOLEAN,
             BasicTypeInfo.BOOLEAN_TYPE_INFO);
 
-    public static final BasicTypeConverter<Date, Date> DATE_CONVERTER =
+    public static final BasicTypeConverter<Date> DATE_CONVERTER =
         new BasicTypeConverter<>(
             BasicType.DATE,
             BasicTypeInfo.DATE_TYPE_INFO);
 
-    public static final BasicTypeConverter<Double, Double> DOUBLE_CONVERTER =
+    public static final BasicTypeConverter<Double> DOUBLE_CONVERTER =
         new BasicTypeConverter<>(
             BasicType.DOUBLE,
             BasicTypeInfo.DOUBLE_TYPE_INFO);
 
-    public static final BasicTypeConverter<Long, Long> LONG_CONVERTER =
+    public static final BasicTypeConverter<Long> LONG_CONVERTER =
         new BasicTypeConverter<>(
             BasicType.LONG,
             BasicTypeInfo.LONG_TYPE_INFO);
 
-    public static final BasicTypeConverter<Float, Float> FLOAT_CONVERTER =
+    public static final BasicTypeConverter<Float> FLOAT_CONVERTER =
         new BasicTypeConverter<>(
             BasicType.FLOAT,
             BasicTypeInfo.FLOAT_TYPE_INFO);
 
-    public static final BasicTypeConverter<Byte, Byte> BYTE_CONVERTER =
+    public static final BasicTypeConverter<Byte> BYTE_CONVERTER =
         new BasicTypeConverter<>(
             BasicType.BYTE,
             BasicTypeInfo.BYTE_TYPE_INFO);
 
-    public static final BasicTypeConverter<Void, Void> NULL_CONVERTER =
+    public static final BasicTypeConverter<Short> SHORT_CONVERTER =
+        new BasicTypeConverter<>(
+            BasicType.SHORT,
+            BasicTypeInfo.SHORT_TYPE_INFO);
+
+    public static final BasicTypeConverter<Character> CHARACTER_CONVERTER =
+        new BasicTypeConverter<>(
+            BasicType.CHARACTER,
+            BasicTypeInfo.CHAR_TYPE_INFO);
+
+    public static final BasicTypeConverter<BigInteger> BIG_INTEGER_CONVERTER =
+        new BasicTypeConverter<>(
+            BasicType.BIG_INTEGER,
+            BasicTypeInfo.BIG_INT_TYPE_INFO);
+
+    public static final BasicTypeConverter<BigDecimal> BIG_DECIMAL =
+        new BasicTypeConverter<>(
+            BasicType.BIG_DECIMAL,
+            BasicTypeInfo.BIG_DEC_TYPE_INFO);
+
+    public static final BasicTypeConverter<Instant> INSTANT_CONVERTER =
+        new BasicTypeConverter<>(
+            BasicType.INSTANT,
+            BasicTypeInfo.INSTANT_TYPE_INFO);
+
+    public static final BasicTypeConverter<Void> NULL_CONVERTER =
         new BasicTypeConverter<>(
             BasicType.NULL,
             BasicTypeInfo.VOID_TYPE_INFO);
 
-    private final DataType<T1> dataType;
-    private final TypeInformation<T2> typeInformation;
+    private final BasicType<T1> seaTunnelDataType;
+    private final TypeInformation<T1> flinkTypeInformation;
 
-    public BasicTypeConverter(DataType<T1> dataType, TypeInformation<T2> typeInformation) {
-        this.dataType = dataType;
-        this.typeInformation = typeInformation;
+    public BasicTypeConverter(BasicType<T1> seaTunnelDataType, TypeInformation<T1> flinkTypeInformation) {
+        this.seaTunnelDataType = seaTunnelDataType;
+        this.flinkTypeInformation = flinkTypeInformation;
     }
 
     @Override
-    public TypeInformation<T2> convert(DataType<T1> dataType) {
-        return typeInformation;
+    public TypeInformation<T1> convert(BasicType<T1> seaTunnelDataType) {
+        return flinkTypeInformation;
     }
 }
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
index 78e6d5ab..f5912f36 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
@@ -25,16 +25,15 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 /**
  * Convert SeaTunnel {@link DataType} to flink type.
  */
-public interface FlinkTypeConverter<T1, T2>
-    extends Converter<DataType<T1>, TypeInformation<T2>> {
+public interface FlinkTypeConverter<T1, T2> extends Converter<T1, T2> {
 
     /**
      * Convert SeaTunnel {@link DataType} to flink {@link  TypeInformation}.
      *
-     * @param dataType SeaTunnel {@link DataType}
+     * @param seaTunnelDataType SeaTunnel {@link DataType}
      * @return flink {@link TypeInformation}
      */
     @Override
-    TypeInformation<T2> convert(DataType<T1> dataType);
+    T2 convert(T1 seaTunnelDataType);
 
 }
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/PojoTypeConverter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/PojoTypeConverter.java
new file mode 100644
index 00000000..efd6e713
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/PojoTypeConverter.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.types;
+
+import org.apache.seatunnel.api.table.type.DataType;
+import org.apache.seatunnel.api.table.type.PojoType;
+import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class PojoTypeConverter<T1> implements FlinkTypeConverter<PojoType<T1>, PojoTypeInfo<T1>> {
+
+    @Override
+    public PojoTypeInfo<T1> convert(PojoType<T1> seaTunnelDataType) {
+        Class<T1> pojoClass = seaTunnelDataType.getPojoClass();
+        Field[] fields = seaTunnelDataType.getFields();
+        DataType<?>[] fieldTypes = seaTunnelDataType.getFieldTypes();
+        if (ArrayUtils.isEmpty(fields)) {
+            return new PojoTypeInfo<>(pojoClass, Collections.emptyList());
+        }
+        List<PojoField> pojoFieldList = new ArrayList<>(fields.length);
+        for (int i = 0; i < fields.length; i++) {
+            PojoField pojoField = new PojoField(fields[i], TypeConverterUtils.convertType(fieldTypes[i]));
+            pojoFieldList.add(pojoField);
+        }
+        return new PojoTypeInfo<>(pojoClass, pojoFieldList);
+    }
+}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DateTimeType.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/TimestampTypeConverter.java
similarity index 59%
rename from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DateTimeType.java
rename to seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/TimestampTypeConverter.java
index 0dfdacf4..b8bf1be7 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DateTimeType.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/TimestampTypeConverter.java
@@ -15,16 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.table.type;
+package org.apache.seatunnel.translation.flink.types;
 
-public class DateTimeType implements DataType {
+import org.apache.seatunnel.api.table.type.TimestampType;
 
-    public DateTimeType() {
+import org.apache.flink.table.runtime.typeutils.TimestampDataTypeInfo;
+
+public class TimestampTypeConverter implements FlinkTypeConverter<TimestampType, TimestampDataTypeInfo> {
+
+    public static final TimestampTypeConverter INSTANCE = new TimestampTypeConverter();
+
+    private TimestampTypeConverter() {
 
     }
 
     @Override
-    public Class getTypeClass() {
-        return null;
+    public TimestampDataTypeInfo convert(TimestampType seaTunnelDataType) {
+        return new TimestampDataTypeInfo(seaTunnelDataType.getPrecision());
     }
 }
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
new file mode 100644
index 00000000..ae155306
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.utils;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DataType;
+import org.apache.seatunnel.api.table.type.EnumType;
+import org.apache.seatunnel.api.table.type.ListType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PojoType;
+import org.apache.seatunnel.api.table.type.TimestampType;
+import org.apache.seatunnel.translation.flink.types.ArrayTypeConverter;
+import org.apache.seatunnel.translation.flink.types.BasicTypeConverter;
+import org.apache.seatunnel.translation.flink.types.PojoTypeConverter;
+import org.apache.seatunnel.translation.flink.types.TimestampTypeConverter;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.Date;
+
+public class TypeConverterUtils {
+
+    @SuppressWarnings("unchecked")
+    public static <T1, T2> TypeInformation<T2> convertType(DataType<T1> dataType) {
+        if (dataType instanceof BasicType) {
+            return (TypeInformation<T2>) convertBasicType((BasicType<T1>) dataType);
+        }
+        if (dataType instanceof TimestampType) {
+            return (TypeInformation<T2>)
+                TimestampTypeConverter.INSTANCE.convert((TimestampType) dataType);
+        }
+        if (dataType instanceof ArrayType) {
+            return (TypeInformation<T2>) convertArrayType((ArrayType<T1>) dataType);
+        }
+        if (dataType instanceof ListType) {
+            return (TypeInformation<T2>) covertListType((ListType<T1>) dataType);
+        }
+        if (dataType instanceof EnumType) {
+
+            return (TypeInformation<T2>) coverEnumType((EnumType<?>) dataType);
+        }
+        if (dataType instanceof MapType) {
+            return (TypeInformation<T2>) convertMapType((MapType<?, ?>) dataType);
+        }
+        if (dataType instanceof PojoType) {
+            return (TypeInformation<T2>) convertPojoType((PojoType<T1>) dataType);
+        }
+        throw new IllegalArgumentException("Unsupported data type: " + dataType);
+    }
+
+    @SuppressWarnings("unchecked")
+    public static <T> TypeInformation<T> convertBasicType(BasicType<T> basicType) {
+        Class<T> physicalTypeClass = basicType.getPhysicalTypeClass();
+        if (physicalTypeClass == Boolean.class) {
+            BasicType<Boolean> booleanBasicType = (BasicType<Boolean>) basicType;
+            return (TypeInformation<T>)
+                BasicTypeConverter.BOOLEAN_CONVERTER.convert(booleanBasicType);
+        }
+        if (physicalTypeClass == String.class) {
+            BasicType<String> stringBasicType = (BasicType<String>) basicType;
+            return (TypeInformation<T>)
+                BasicTypeConverter.STRING_CONVERTER.convert(stringBasicType);
+        }
+        if (physicalTypeClass == Date.class) {
+            BasicType<Date> dateBasicType = (BasicType<Date>) basicType;
+            return (TypeInformation<T>)
+                BasicTypeConverter.DATE_CONVERTER.convert(dateBasicType);
+        }
+        if (physicalTypeClass == Double.class) {
+            BasicType<Double> doubleBasicType = (BasicType<Double>) basicType;
+            return (TypeInformation<T>)
+                BasicTypeConverter.DOUBLE_CONVERTER.convert(doubleBasicType);
+        }
+        if (physicalTypeClass == Integer.class) {
+            BasicType<Integer> integerBasicType = (BasicType<Integer>) basicType;
+            return (TypeInformation<T>)
+                BasicTypeConverter.INTEGER_CONVERTER.convert(integerBasicType);
+        }
+        if (physicalTypeClass == Long.class) {
+            BasicType<Long> longBasicType = (BasicType<Long>) basicType;
+            return (TypeInformation<T>)
+                BasicTypeConverter.LONG_CONVERTER.convert(longBasicType);
+        }
+        if (physicalTypeClass == Float.class) {
+            BasicType<Float> floatBasicType = (BasicType<Float>) basicType;
+            return (TypeInformation<T>)
+                BasicTypeConverter.FLOAT_CONVERTER.convert(floatBasicType);
+        }
+        if (physicalTypeClass == Byte.class) {
+            BasicType<Byte> byteBasicType = (BasicType<Byte>) basicType;
+            return (TypeInformation<T>)
+                BasicTypeConverter.BYTE_CONVERTER.convert(byteBasicType);
+        }
+        if (physicalTypeClass == Short.class) {
+            BasicType<Short> shortBasicType = (BasicType<Short>) basicType;
+            return (TypeInformation<T>)
+                BasicTypeConverter.SHORT_CONVERTER.convert(shortBasicType);
+        }
+        if (physicalTypeClass == Character.class) {
+            BasicType<Character> characterBasicType = (BasicType<Character>) basicType;
+            return (TypeInformation<T>)
+                BasicTypeConverter.CHARACTER_CONVERTER.convert(characterBasicType);
+        }
+        if (physicalTypeClass == BigInteger.class) {
+            BasicType<BigInteger> bigIntegerBasicType = (BasicType<BigInteger>) basicType;
+            return (TypeInformation<T>)
+                BasicTypeConverter.BIG_INTEGER_CONVERTER.convert(bigIntegerBasicType);
+        }
+        if (physicalTypeClass == BigDecimal.class) {
+            BasicType<BigDecimal> bigDecimalBasicType = (BasicType<BigDecimal>) basicType;
+            return (TypeInformation<T>)
+                BasicTypeConverter.BIG_DECIMAL.convert(bigDecimalBasicType);
+        }
+        if (physicalTypeClass == Void.class) {
+            BasicType<Void> voidBasicType = (BasicType<Void>) basicType;
+            return (TypeInformation<T>)
+                BasicTypeConverter.NULL_CONVERTER.convert(voidBasicType);
+        }
+        throw new IllegalArgumentException("Unsupported basic type: " + basicType);
+    }
+
+    public static <T1, T2> BasicArrayTypeInfo<T1, T2> convertArrayType(ArrayType<T1> arrayType) {
+        ArrayTypeConverter<T1, T2> arrayTypeConverter = new ArrayTypeConverter<>();
+        return arrayTypeConverter.convert(arrayType);
+    }
+
+    public static <T> ListTypeInfo<T> covertListType(ListType<T> listType) {
+        DataType<T> elementType = listType.getElementType();
+        return new ListTypeInfo<>(convertType(elementType));
+    }
+
+    public static <T extends Enum<T>> EnumTypeInfo<T> coverEnumType(EnumType<T> enumType) {
+        Class<T> enumClass = enumType.getEnumClass();
+        return new EnumTypeInfo<>(enumClass);
+    }
+
+    public static <K, V> MapTypeInfo<K, V> convertMapType(MapType<K, V> mapType) {
+        DataType<K> keyType = mapType.getKeyType();
+        DataType<V> valueType = mapType.getValueType();
+        return new MapTypeInfo<>(convertType(keyType), convertType(valueType));
+    }
+
+    public static <T> PojoTypeInfo<T> convertPojoType(PojoType<T> pojoType) {
+        PojoTypeConverter<T> pojoTypeConverter = new PojoTypeConverter<>();
+        return pojoTypeConverter.convert(pojoType);
+    }
+}