You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/05 14:39:32 UTC

[incubator-seatunnel] branch api-draft updated: Add flink datatype converter (#1801)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new 1935d712 Add flink datatype converter (#1801)
1935d712 is described below

commit 1935d712456319bee07d7f18c6e7d9e0262c601e
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Thu May 5 22:39:26 2022 +0800

    Add flink datatype converter (#1801)
---
 .../apache/seatunnel/api/table/type/BasicType.java | 34 +++++++--
 .../seatunnel/api/table/type/BooleanType.java      | 26 -------
 .../table/type/{NullType.java => Converter.java}   |  8 +-
 .../apache/seatunnel/api/table/type/DataType.java  |  7 +-
 .../seatunnel/api/table/type/DateTimeType.java     |  5 --
 .../seatunnel/api/table/type/DoubleType.java       | 27 -------
 .../apache/seatunnel/api/table/type/IntType.java   | 27 -------
 .../apache/seatunnel/api/table/type/ListType.java  | 14 ++--
 .../apache/seatunnel/api/table/type/LongType.java  | 27 -------
 .../table/type/{DateType.java => PojoType.java}    | 15 ++--
 .../seatunnel/api/table/type/StringType.java       | 27 -------
 .../flink/types/BasicTypeConverter.java            | 87 ++++++++++++++++++++++
 .../flink/types/FlinkTypeConverter.java            | 32 +++++++-
 .../translation/flink/types/StringConverter.java   | 14 ----
 14 files changed, 169 insertions(+), 181 deletions(-)

diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java
index 8135fb35..13489f22 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java
@@ -1,7 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.seatunnel.api.table.type;
 
+import java.util.Date;
+
 public class BasicType<T> implements DataType<T> {
 
+    public static final BasicType<Boolean> BOOLEAN = new BasicType<>(Boolean.class);
+    public static final BasicType<String> STRING = new BasicType<>(String.class);
+    public static final BasicType<Date> DATE = new BasicType<>(Date.class);
+    public static final BasicType<Double> DOUBLE = new BasicType<>(Double.class);
+    public static final BasicType<Integer> INTEGER = new BasicType<>(Integer.class);
+    public static final BasicType<Long> LONG = new BasicType<>(Long.class);
+    public static final BasicType<Float> FLOAT = new BasicType<>(Float.class);
+    public static final BasicType<Byte> BYTE = new BasicType<>(Byte.class);
+    public static final BasicType<Void> NULL = new BasicType<>(Void.class);
+
     private final Class<T> typeClass;
 
     public BasicType(Class<T> typeClass) {
@@ -11,11 +40,6 @@ public class BasicType<T> implements DataType<T> {
         this.typeClass = typeClass;
     }
 
-    @Override
-    public boolean isBasicType() {
-        return true;
-    }
-
     @Override
     public Class<T> getTypeClass() {
         return this.typeClass;
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BooleanType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BooleanType.java
deleted file mode 100644
index 5f2be918..00000000
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BooleanType.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.api.table.type;
-
-public class BooleanType extends BasicType<Boolean> {
-    private static final BooleanType INSTANCE = new BooleanType(Boolean.class);
-
-    private BooleanType(Class<Boolean> typeClass) {
-        super(typeClass);
-    }
-}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/NullType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/Converter.java
similarity index 81%
rename from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/NullType.java
rename to seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/Converter.java
index 781a906b..662f2627 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/NullType.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/Converter.java
@@ -17,11 +17,7 @@
 
 package org.apache.seatunnel.api.table.type;
 
-public class NullType extends BasicType<Void> {
+public interface Converter<T1, T2> {
 
-    private static final NullType INSTANCE = new NullType(Void.class);
-
-    private NullType(Class<Void> typeClass) {
-        super(typeClass);
-    }
+    T2 convert(T1 dataType);
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java
index 4cc51c3c..0c65f471 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java
@@ -22,8 +22,11 @@ package org.apache.seatunnel.api.table.type;
  */
 public interface DataType<T> {
 
-    boolean isBasicType();
-
+    /**
+     * The type class.
+     *
+     * @return the type class.
+     */
     Class<T> getTypeClass();
 
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DateTimeType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DateTimeType.java
index 25d1bb95..0dfdacf4 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DateTimeType.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DateTimeType.java
@@ -23,11 +23,6 @@ public class DateTimeType implements DataType {
 
     }
 
-    @Override
-    public boolean isBasicType() {
-        return false;
-    }
-
     @Override
     public Class getTypeClass() {
         return null;
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DoubleType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DoubleType.java
deleted file mode 100644
index b5ddaa87..00000000
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DoubleType.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.api.table.type;
-
-public class DoubleType extends BasicType<Double> {
-
-    public static final DoubleType INSTANCE = new DoubleType(Double.class);
-
-    private DoubleType(Class<Double> typeClass) {
-        super(typeClass);
-    }
-}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/IntType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/IntType.java
deleted file mode 100644
index fdcb8abf..00000000
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/IntType.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.api.table.type;
-
-public class IntType extends BasicType<Integer> {
-
-    private static final IntType INSTANCE = new IntType(Integer.class);
-
-    private IntType(Class<Integer> typeClass) {
-        super(typeClass);
-    }
-}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ListType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ListType.java
index 9a7fef45..54ccad0b 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ListType.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ListType.java
@@ -17,15 +17,17 @@
 
 package org.apache.seatunnel.api.table.type;
 
-public class ListType implements DataType {
+public class ListType<T> implements DataType<T> {
 
-    @Override
-    public boolean isBasicType() {
-        return false;
+    // todo: use DataType?
+    private final Class<T> typeClass;
+
+    public ListType(Class<T> typeClass) {
+        this.typeClass = typeClass;
     }
 
     @Override
-    public Class getTypeClass() {
-        return null;
+    public Class<T> getTypeClass() {
+        return typeClass;
     }
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/LongType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/LongType.java
deleted file mode 100644
index 894a48b4..00000000
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/LongType.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.api.table.type;
-
-public class LongType extends BasicType<Long> {
-
-    private static final LongType INSTANCE = new LongType(Long.class);
-
-    private LongType(Class<Long> typeClass) {
-        super(typeClass);
-    }
-}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DateType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/PojoType.java
similarity index 74%
rename from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DateType.java
rename to seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/PojoType.java
index 8ab7c557..e70fcd62 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DateType.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/PojoType.java
@@ -17,13 +17,18 @@
 
 package org.apache.seatunnel.api.table.type;
 
-import java.util.Date;
+// todo: we may don't need to pojo type
+public class PojoType<T> implements DataType<T> {
 
-public class DateType extends BasicType<Date> {
+    private final Class<T> typeClass;
 
-    private static final DateType INSTANCE = new DateType(Date.class);
+    public PojoType(Class<T> typeClass) {
+        this.typeClass = typeClass;
 
-    private DateType(Class<Date> typeClass) {
-        super(typeClass);
+    }
+
+    @Override
+    public Class<T> getTypeClass() {
+        return typeClass;
     }
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/StringType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/StringType.java
deleted file mode 100644
index c2247d9f..00000000
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/StringType.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.api.table.type;
-
-public class StringType extends BasicType<String> {
-
-    public static final StringType INSTANCE = new StringType(String.class);
-
-    private StringType(Class<String> typeClass) {
-        super(typeClass);
-    }
-}
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/BasicTypeConverter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/BasicTypeConverter.java
new file mode 100644
index 00000000..7a2ee1ff
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/BasicTypeConverter.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.types;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DataType;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Date;
+
+public class BasicTypeConverter<T1, T2> implements FlinkTypeConverter<T1, T2> {
+
+    public static final BasicTypeConverter<String, String> STRING_CONVERTER =
+        new BasicTypeConverter<>(
+            BasicType.STRING,
+            BasicTypeInfo.STRING_TYPE_INFO);
+
+    public static final BasicTypeConverter<Integer, Integer> INTEGER_CONVERTER =
+        new BasicTypeConverter<>(
+            BasicType.INTEGER,
+            BasicTypeInfo.INT_TYPE_INFO);
+
+    public static final BasicTypeConverter<Boolean, Boolean> BOOLEAN_CONVERTER =
+        new BasicTypeConverter<>(
+            BasicType.BOOLEAN,
+            BasicTypeInfo.BOOLEAN_TYPE_INFO);
+
+    public static final BasicTypeConverter<Date, Date> DATE_CONVERTER =
+        new BasicTypeConverter<>(
+            BasicType.DATE,
+            BasicTypeInfo.DATE_TYPE_INFO);
+
+    public static final BasicTypeConverter<Double, Double> DOUBLE_CONVERTER =
+        new BasicTypeConverter<>(
+            BasicType.DOUBLE,
+            BasicTypeInfo.DOUBLE_TYPE_INFO);
+
+    public static final BasicTypeConverter<Long, Long> LONG_CONVERTER =
+        new BasicTypeConverter<>(
+            BasicType.LONG,
+            BasicTypeInfo.LONG_TYPE_INFO);
+
+    public static final BasicTypeConverter<Float, Float> FLOAT_CONVERTER =
+        new BasicTypeConverter<>(
+            BasicType.FLOAT,
+            BasicTypeInfo.FLOAT_TYPE_INFO);
+
+    public static final BasicTypeConverter<Byte, Byte> BYTE_CONVERTER =
+        new BasicTypeConverter<>(
+            BasicType.BYTE,
+            BasicTypeInfo.BYTE_TYPE_INFO);
+
+    public static final BasicTypeConverter<Void, Void> NULL_CONVERTER =
+        new BasicTypeConverter<>(
+            BasicType.NULL,
+            BasicTypeInfo.VOID_TYPE_INFO);
+
+    private final DataType<T1> dataType;
+    private final TypeInformation<T2> typeInformation;
+
+    public BasicTypeConverter(DataType<T1> dataType, TypeInformation<T2> typeInformation) {
+        this.dataType = dataType;
+        this.typeInformation = typeInformation;
+    }
+
+    @Override
+    public TypeInformation<T2> convert(DataType<T1> dataType) {
+        return typeInformation;
+    }
+}
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
index f07fa71c..78e6d5ab 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
@@ -1,16 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.seatunnel.translation.flink.types;
 
+import org.apache.seatunnel.api.table.type.Converter;
 import org.apache.seatunnel.api.table.type.DataType;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 
 /**
  * Convert SeaTunnel {@link DataType} to flink type.
- *
- * @param <T> target flink type
  */
-public interface FlinkTypeConverter<T> {
+public interface FlinkTypeConverter<T1, T2>
+    extends Converter<DataType<T1>, TypeInformation<T2>> {
 
-    TypeInformation<T> convert(DataType dataType);
+    /**
+     * Convert SeaTunnel {@link DataType} to flink {@link  TypeInformation}.
+     *
+     * @param dataType SeaTunnel {@link DataType}
+     * @return flink {@link TypeInformation}
+     */
+    @Override
+    TypeInformation<T2> convert(DataType<T1> dataType);
 
 }
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/StringConverter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/StringConverter.java
deleted file mode 100644
index 3b26491f..00000000
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/StringConverter.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package org.apache.seatunnel.translation.flink.types;
-
-import org.apache.seatunnel.api.table.type.DataType;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-public class StringConverter implements FlinkTypeConverter<String> {
-
-    @Override
-    public TypeInformation<String> convert(DataType dataType) {
-
-        return null;
-    }
-}