You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/05 14:39:32 UTC
[incubator-seatunnel] branch api-draft updated: Add flink datatype converter (#1801)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new 1935d712 Add flink datatype converter (#1801)
1935d712 is described below
commit 1935d712456319bee07d7f18c6e7d9e0262c601e
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Thu May 5 22:39:26 2022 +0800
Add flink datatype converter (#1801)
---
.../apache/seatunnel/api/table/type/BasicType.java | 34 +++++++--
.../seatunnel/api/table/type/BooleanType.java | 26 -------
.../table/type/{NullType.java => Converter.java} | 8 +-
.../apache/seatunnel/api/table/type/DataType.java | 7 +-
.../seatunnel/api/table/type/DateTimeType.java | 5 --
.../seatunnel/api/table/type/DoubleType.java | 27 -------
.../apache/seatunnel/api/table/type/IntType.java | 27 -------
.../apache/seatunnel/api/table/type/ListType.java | 14 ++--
.../apache/seatunnel/api/table/type/LongType.java | 27 -------
.../table/type/{DateType.java => PojoType.java} | 15 ++--
.../seatunnel/api/table/type/StringType.java | 27 -------
.../flink/types/BasicTypeConverter.java | 87 ++++++++++++++++++++++
.../flink/types/FlinkTypeConverter.java | 32 +++++++-
.../translation/flink/types/StringConverter.java | 14 ----
14 files changed, 169 insertions(+), 181 deletions(-)
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java
index 8135fb35..13489f22 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java
@@ -1,7 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.seatunnel.api.table.type;
+import java.util.Date;
+
public class BasicType<T> implements DataType<T> {
+ public static final BasicType<Boolean> BOOLEAN = new BasicType<>(Boolean.class);
+ public static final BasicType<String> STRING = new BasicType<>(String.class);
+ public static final BasicType<Date> DATE = new BasicType<>(Date.class);
+ public static final BasicType<Double> DOUBLE = new BasicType<>(Double.class);
+ public static final BasicType<Integer> INTEGER = new BasicType<>(Integer.class);
+ public static final BasicType<Long> LONG = new BasicType<>(Long.class);
+ public static final BasicType<Float> FLOAT = new BasicType<>(Float.class);
+ public static final BasicType<Byte> BYTE = new BasicType<>(Byte.class);
+ public static final BasicType<Void> NULL = new BasicType<>(Void.class);
+
private final Class<T> typeClass;
public BasicType(Class<T> typeClass) {
@@ -11,11 +40,6 @@ public class BasicType<T> implements DataType<T> {
this.typeClass = typeClass;
}
- @Override
- public boolean isBasicType() {
- return true;
- }
-
@Override
public Class<T> getTypeClass() {
return this.typeClass;
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BooleanType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BooleanType.java
deleted file mode 100644
index 5f2be918..00000000
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BooleanType.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.api.table.type;
-
-public class BooleanType extends BasicType<Boolean> {
- private static final BooleanType INSTANCE = new BooleanType(Boolean.class);
-
- private BooleanType(Class<Boolean> typeClass) {
- super(typeClass);
- }
-}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/NullType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/Converter.java
similarity index 81%
rename from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/NullType.java
rename to seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/Converter.java
index 781a906b..662f2627 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/NullType.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/Converter.java
@@ -17,11 +17,7 @@
package org.apache.seatunnel.api.table.type;
-public class NullType extends BasicType<Void> {
+public interface Converter<T1, T2> {
- private static final NullType INSTANCE = new NullType(Void.class);
-
- private NullType(Class<Void> typeClass) {
- super(typeClass);
- }
+ T2 convert(T1 dataType);
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java
index 4cc51c3c..0c65f471 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DataType.java
@@ -22,8 +22,11 @@ package org.apache.seatunnel.api.table.type;
*/
public interface DataType<T> {
- boolean isBasicType();
-
+ /**
+ * The type class.
+ *
+ * @return the type class.
+ */
Class<T> getTypeClass();
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DateTimeType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DateTimeType.java
index 25d1bb95..0dfdacf4 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DateTimeType.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DateTimeType.java
@@ -23,11 +23,6 @@ public class DateTimeType implements DataType {
}
- @Override
- public boolean isBasicType() {
- return false;
- }
-
@Override
public Class getTypeClass() {
return null;
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DoubleType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DoubleType.java
deleted file mode 100644
index b5ddaa87..00000000
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DoubleType.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.api.table.type;
-
-public class DoubleType extends BasicType<Double> {
-
- public static final DoubleType INSTANCE = new DoubleType(Double.class);
-
- private DoubleType(Class<Double> typeClass) {
- super(typeClass);
- }
-}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/IntType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/IntType.java
deleted file mode 100644
index fdcb8abf..00000000
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/IntType.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.api.table.type;
-
-public class IntType extends BasicType<Integer> {
-
- private static final IntType INSTANCE = new IntType(Integer.class);
-
- private IntType(Class<Integer> typeClass) {
- super(typeClass);
- }
-}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ListType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ListType.java
index 9a7fef45..54ccad0b 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ListType.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ListType.java
@@ -17,15 +17,17 @@
package org.apache.seatunnel.api.table.type;
-public class ListType implements DataType {
+public class ListType<T> implements DataType<T> {
- @Override
- public boolean isBasicType() {
- return false;
+ // todo: use DataType?
+ private final Class<T> typeClass;
+
+ public ListType(Class<T> typeClass) {
+ this.typeClass = typeClass;
}
@Override
- public Class getTypeClass() {
- return null;
+ public Class<T> getTypeClass() {
+ return typeClass;
}
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/LongType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/LongType.java
deleted file mode 100644
index 894a48b4..00000000
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/LongType.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.api.table.type;
-
-public class LongType extends BasicType<Long> {
-
- private static final LongType INSTANCE = new LongType(Long.class);
-
- private LongType(Class<Long> typeClass) {
- super(typeClass);
- }
-}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DateType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/PojoType.java
similarity index 74%
rename from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DateType.java
rename to seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/PojoType.java
index 8ab7c557..e70fcd62 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/DateType.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/PojoType.java
@@ -17,13 +17,18 @@
package org.apache.seatunnel.api.table.type;
-import java.util.Date;
+// todo: we may don't need to pojo type
+public class PojoType<T> implements DataType<T> {
-public class DateType extends BasicType<Date> {
+ private final Class<T> typeClass;
- private static final DateType INSTANCE = new DateType(Date.class);
+ public PojoType(Class<T> typeClass) {
+ this.typeClass = typeClass;
- private DateType(Class<Date> typeClass) {
- super(typeClass);
+ }
+
+ @Override
+ public Class<T> getTypeClass() {
+ return typeClass;
}
}
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/StringType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/StringType.java
deleted file mode 100644
index c2247d9f..00000000
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/StringType.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.api.table.type;
-
-public class StringType extends BasicType<String> {
-
- public static final StringType INSTANCE = new StringType(String.class);
-
- private StringType(Class<String> typeClass) {
- super(typeClass);
- }
-}
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/BasicTypeConverter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/BasicTypeConverter.java
new file mode 100644
index 00000000..7a2ee1ff
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/BasicTypeConverter.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.types;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DataType;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Date;
+
+public class BasicTypeConverter<T1, T2> implements FlinkTypeConverter<T1, T2> {
+
+ public static final BasicTypeConverter<String, String> STRING_CONVERTER =
+ new BasicTypeConverter<>(
+ BasicType.STRING,
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ public static final BasicTypeConverter<Integer, Integer> INTEGER_CONVERTER =
+ new BasicTypeConverter<>(
+ BasicType.INTEGER,
+ BasicTypeInfo.INT_TYPE_INFO);
+
+ public static final BasicTypeConverter<Boolean, Boolean> BOOLEAN_CONVERTER =
+ new BasicTypeConverter<>(
+ BasicType.BOOLEAN,
+ BasicTypeInfo.BOOLEAN_TYPE_INFO);
+
+ public static final BasicTypeConverter<Date, Date> DATE_CONVERTER =
+ new BasicTypeConverter<>(
+ BasicType.DATE,
+ BasicTypeInfo.DATE_TYPE_INFO);
+
+ public static final BasicTypeConverter<Double, Double> DOUBLE_CONVERTER =
+ new BasicTypeConverter<>(
+ BasicType.DOUBLE,
+ BasicTypeInfo.DOUBLE_TYPE_INFO);
+
+ public static final BasicTypeConverter<Long, Long> LONG_CONVERTER =
+ new BasicTypeConverter<>(
+ BasicType.LONG,
+ BasicTypeInfo.LONG_TYPE_INFO);
+
+ public static final BasicTypeConverter<Float, Float> FLOAT_CONVERTER =
+ new BasicTypeConverter<>(
+ BasicType.FLOAT,
+ BasicTypeInfo.FLOAT_TYPE_INFO);
+
+ public static final BasicTypeConverter<Byte, Byte> BYTE_CONVERTER =
+ new BasicTypeConverter<>(
+ BasicType.BYTE,
+ BasicTypeInfo.BYTE_TYPE_INFO);
+
+ public static final BasicTypeConverter<Void, Void> NULL_CONVERTER =
+ new BasicTypeConverter<>(
+ BasicType.NULL,
+ BasicTypeInfo.VOID_TYPE_INFO);
+
+ private final DataType<T1> dataType;
+ private final TypeInformation<T2> typeInformation;
+
+ public BasicTypeConverter(DataType<T1> dataType, TypeInformation<T2> typeInformation) {
+ this.dataType = dataType;
+ this.typeInformation = typeInformation;
+ }
+
+ @Override
+ public TypeInformation<T2> convert(DataType<T1> dataType) {
+ return typeInformation;
+ }
+}
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
index f07fa71c..78e6d5ab 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/FlinkTypeConverter.java
@@ -1,16 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.seatunnel.translation.flink.types;
+import org.apache.seatunnel.api.table.type.Converter;
import org.apache.seatunnel.api.table.type.DataType;
import org.apache.flink.api.common.typeinfo.TypeInformation;
/**
* Convert SeaTunnel {@link DataType} to flink type.
- *
- * @param <T> target flink type
*/
-public interface FlinkTypeConverter<T> {
+public interface FlinkTypeConverter<T1, T2>
+ extends Converter<DataType<T1>, TypeInformation<T2>> {
- TypeInformation<T> convert(DataType dataType);
+ /**
+ * Convert SeaTunnel {@link DataType} to flink {@link TypeInformation}.
+ *
+ * @param dataType SeaTunnel {@link DataType}
+ * @return flink {@link TypeInformation}
+ */
+ @Override
+ TypeInformation<T2> convert(DataType<T1> dataType);
}
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/StringConverter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/StringConverter.java
deleted file mode 100644
index 3b26491f..00000000
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/StringConverter.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package org.apache.seatunnel.translation.flink.types;
-
-import org.apache.seatunnel.api.table.type.DataType;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-public class StringConverter implements FlinkTypeConverter<String> {
-
- @Override
- public TypeInformation<String> convert(DataType dataType) {
-
- return null;
- }
-}