You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/07 04:49:38 UTC

[incubator-seatunnel] branch api-draft updated: Add flink type converter UT (#1813)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new 06e74eb5 Add flink type converter UT (#1813)
06e74eb5 is described below

commit 06e74eb5bdccad5e1ecbcb07e7096c066efa7802
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Sat May 7 12:49:34 2022 +0800

    Add flink type converter UT (#1813)
---
 .../seatunnel-translation-flink/pom.xml            |   5 +
 .../flink/types/ArrayTypeConverter.java            |   1 +
 .../translation/flink/types/PojoTypeConverter.java |  21 +--
 .../flink/types/ArrayTypeConverterTest.java        | 110 ++++++++++++++++
 .../flink/types/BasicTypeConverterTest.java        | 145 +++++++++++++++++++++
 .../flink/types/PojoTypeConverterTest.java         |  67 ++++++++++
 6 files changed, 329 insertions(+), 20 deletions(-)

diff --git a/seatunnel-translation/seatunnel-translation-flink/pom.xml b/seatunnel-translation/seatunnel-translation-flink/pom.xml
index 0a7b0558..e996acdc 100644
--- a/seatunnel-translation/seatunnel-translation-flink/pom.xml
+++ b/seatunnel-translation/seatunnel-translation-flink/pom.xml
@@ -42,5 +42,10 @@
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/ArrayTypeConverter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/ArrayTypeConverter.java
index d4aa1866..c10f0450 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/ArrayTypeConverter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/ArrayTypeConverter.java
@@ -27,6 +27,7 @@ public class ArrayTypeConverter<T1, T2> implements FlinkTypeConverter<ArrayType<
     @Override
     @SuppressWarnings("unchecked")
     public BasicArrayTypeInfo<T1, T2> convert(ArrayType<T1> arrayType) {
+        // todo: now we only support basic array types
         BasicType<T1> elementType = arrayType.getElementType();
         if (BasicType.BOOLEAN.equals(elementType)) {
             return (BasicArrayTypeInfo<T1, T2>) BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO;
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/PojoTypeConverter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/PojoTypeConverter.java
index efd6e713..b2e41e56 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/PojoTypeConverter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/types/PojoTypeConverter.java
@@ -17,34 +17,15 @@
 
 package org.apache.seatunnel.translation.flink.types;
 
-import org.apache.seatunnel.api.table.type.DataType;
 import org.apache.seatunnel.api.table.type.PojoType;
-import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
 
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.flink.api.java.typeutils.PojoField;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
 public class PojoTypeConverter<T1> implements FlinkTypeConverter<PojoType<T1>, PojoTypeInfo<T1>> {
 
     @Override
     public PojoTypeInfo<T1> convert(PojoType<T1> seaTunnelDataType) {
         Class<T1> pojoClass = seaTunnelDataType.getPojoClass();
-        Field[] fields = seaTunnelDataType.getFields();
-        DataType<?>[] fieldTypes = seaTunnelDataType.getFieldTypes();
-        if (ArrayUtils.isEmpty(fields)) {
-            return new PojoTypeInfo<>(pojoClass, Collections.emptyList());
-        }
-        List<PojoField> pojoFieldList = new ArrayList<>(fields.length);
-        for (int i = 0; i < fields.length; i++) {
-            PojoField pojoField = new PojoField(fields[i], TypeConverterUtils.convertType(fieldTypes[i]));
-            pojoFieldList.add(pojoField);
-        }
-        return new PojoTypeInfo<>(pojoClass, pojoFieldList);
+        return (PojoTypeInfo<T1>) PojoTypeInfo.of(pojoClass);
     }
 }
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/test/java/org/apache/seatunnel/translation/flink/types/ArrayTypeConverterTest.java b/seatunnel-translation/seatunnel-translation-flink/src/test/java/org/apache/seatunnel/translation/flink/types/ArrayTypeConverterTest.java
new file mode 100644
index 00000000..49eec2e1
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/src/test/java/org/apache/seatunnel/translation/flink/types/ArrayTypeConverterTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.types;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ArrayTypeConverterTest {
+
+    @Test
+    public void convertBooleanArrayType() {
+        ArrayType<Boolean> booleanArrayType = new ArrayType<>(BasicType.BOOLEAN);
+        ArrayTypeConverter<Boolean, Boolean> booleanArrayTypeConverter = new ArrayTypeConverter<>();
+        Assert.assertEquals(
+            BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO,
+            booleanArrayTypeConverter.convert(booleanArrayType));
+    }
+
+    @Test
+    public void convertStringArrayType() {
+        ArrayType<String> stringArrayType = new ArrayType<>(BasicType.STRING);
+        ArrayTypeConverter<String, String> stringArrayTypeConverter = new ArrayTypeConverter<>();
+        Assert.assertEquals(
+            BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO,
+            stringArrayTypeConverter.convert(stringArrayType));
+    }
+
+    @Test
+    public void convertDoubleArrayType() {
+        ArrayType<Double> doubleArrayType = new ArrayType<>(BasicType.DOUBLE);
+        ArrayTypeConverter<Double, Double> doubleArrayTypeConverter = new ArrayTypeConverter<>();
+        Assert.assertEquals(
+            BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO,
+            doubleArrayTypeConverter.convert(doubleArrayType));
+    }
+
+    @Test
+    public void convertIntegerArrayType() {
+        ArrayType<Integer> integerArrayType = new ArrayType<>(BasicType.INTEGER);
+        ArrayTypeConverter<Integer, Integer> integerArrayTypeConverter = new ArrayTypeConverter<>();
+        Assert.assertEquals(
+            BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO,
+            integerArrayTypeConverter.convert(integerArrayType));
+    }
+
+    @Test
+    public void convertLongArrayType() {
+        ArrayType<Long> longArrayType = new ArrayType<>(BasicType.LONG);
+        ArrayTypeConverter<Long, Long> longArrayTypeConverter = new ArrayTypeConverter<>();
+        Assert.assertEquals(
+            BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO,
+            longArrayTypeConverter.convert(longArrayType));
+    }
+
+    @Test
+    public void convertFloatArrayType() {
+        ArrayType<Float> floatArrayType = new ArrayType<>(BasicType.FLOAT);
+        ArrayTypeConverter<Float, Float> floatArrayTypeConverter = new ArrayTypeConverter<>();
+        Assert.assertEquals(
+            BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO,
+            floatArrayTypeConverter.convert(floatArrayType));
+    }
+
+    @Test
+    public void convertByteArrayType() {
+        ArrayType<Byte> byteArrayType = new ArrayType<>(BasicType.BYTE);
+        ArrayTypeConverter<Byte, Byte> byteArrayTypeConverter = new ArrayTypeConverter<>();
+        Assert.assertEquals(
+            BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO,
+            byteArrayTypeConverter.convert(byteArrayType));
+    }
+
+    @Test
+    public void convertShortArrayType() {
+        ArrayType<Short> shortArrayType = new ArrayType<>(BasicType.SHORT);
+        ArrayTypeConverter<Short, Short> shortArrayTypeConverter = new ArrayTypeConverter<>();
+        Assert.assertEquals(
+            BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO,
+            shortArrayTypeConverter.convert(shortArrayType));
+    }
+
+    @Test
+    public void convertCharacterArrayType() {
+        ArrayType<Character> characterArrayType = new ArrayType<>(BasicType.CHARACTER);
+        ArrayTypeConverter<Character, Character> characterArrayTypeConverter = new ArrayTypeConverter<>();
+        Assert.assertEquals(
+            BasicArrayTypeInfo.CHAR_ARRAY_TYPE_INFO,
+            characterArrayTypeConverter.convert(characterArrayType));
+    }
+
+}
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/test/java/org/apache/seatunnel/translation/flink/types/BasicTypeConverterTest.java b/seatunnel-translation/seatunnel-translation-flink/src/test/java/org/apache/seatunnel/translation/flink/types/BasicTypeConverterTest.java
new file mode 100644
index 00000000..2d7b1448
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/src/test/java/org/apache/seatunnel/translation/flink/types/BasicTypeConverterTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.types;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.time.Instant;
+import java.util.Date;
+
+public class BasicTypeConverterTest {
+
+    @Test
+    public void convertStringType() {
+        BasicType<String> stringBasicType = BasicType.STRING;
+        TypeInformation<String> stringTypeInformation =
+            BasicTypeConverter.STRING_CONVERTER.convert(stringBasicType);
+        Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, stringTypeInformation);
+    }
+
+    @Test
+    public void convertIntegerType() {
+        BasicType<Integer> integerBasicType = BasicType.INTEGER;
+        TypeInformation<Integer> integerTypeInformation =
+            BasicTypeConverter.INTEGER_CONVERTER.convert(integerBasicType);
+        Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, integerTypeInformation);
+    }
+
+    @Test
+    public void convertBooleanType() {
+        BasicType<Boolean> booleanBasicType = BasicType.BOOLEAN;
+        TypeInformation<Boolean> booleanTypeInformation =
+            BasicTypeConverter.BOOLEAN_CONVERTER.convert(booleanBasicType);
+        Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, booleanTypeInformation);
+    }
+
+    @Test
+    public void convertDateType() {
+        BasicType<Date> dateBasicType = BasicType.DATE;
+        TypeInformation<Date> dateTypeInformation =
+            BasicTypeConverter.DATE_CONVERTER.convert(dateBasicType);
+        Assert.assertEquals(BasicTypeInfo.DATE_TYPE_INFO, dateTypeInformation);
+    }
+
+    @Test
+    public void convertDoubleType() {
+        BasicType<Double> doubleBasicType = BasicType.DOUBLE;
+        TypeInformation<Double> doubleTypeInformation =
+            BasicTypeConverter.DOUBLE_CONVERTER.convert(doubleBasicType);
+        Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, doubleTypeInformation);
+    }
+
+    @Test
+    public void convertLongType() {
+        BasicType<Long> longBasicType = BasicType.LONG;
+        TypeInformation<Long> longTypeInformation =
+            BasicTypeConverter.LONG_CONVERTER.convert(longBasicType);
+        Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, longTypeInformation);
+    }
+
+    @Test
+    public void convertFloatType() {
+        BasicType<Float> floatBasicType = BasicType.FLOAT;
+        TypeInformation<Float> floatTypeInformation =
+            BasicTypeConverter.FLOAT_CONVERTER.convert(floatBasicType);
+        Assert.assertEquals(BasicTypeInfo.FLOAT_TYPE_INFO, floatTypeInformation);
+    }
+
+    @Test
+    public void convertByteType() {
+        BasicType<Byte> byteBasicType = BasicType.BYTE;
+        TypeInformation<Byte> byteTypeInformation =
+            BasicTypeConverter.BYTE_CONVERTER.convert(byteBasicType);
+        Assert.assertEquals(BasicTypeInfo.BYTE_TYPE_INFO, byteTypeInformation);
+    }
+
+    @Test
+    public void convertShortType() {
+        BasicType<Short> shortBasicType = BasicType.SHORT;
+        TypeInformation<Short> shortTypeInformation =
+            BasicTypeConverter.SHORT_CONVERTER.convert(shortBasicType);
+        Assert.assertEquals(BasicTypeInfo.SHORT_TYPE_INFO, shortTypeInformation);
+    }
+
+    @Test
+    public void convertCharacterType() {
+        BasicType<Character> characterBasicType = BasicType.CHARACTER;
+        TypeInformation<Character> characterTypeInformation =
+            BasicTypeConverter.CHARACTER_CONVERTER.convert(characterBasicType);
+        Assert.assertEquals(BasicTypeInfo.CHAR_TYPE_INFO, characterTypeInformation);
+    }
+
+    @Test
+    public void convertBigIntegerType() {
+        BasicType<BigInteger> bigIntegerBasicType = BasicType.BIG_INTEGER;
+        TypeInformation<BigInteger> bigIntegerTypeInformation =
+            BasicTypeConverter.BIG_INTEGER_CONVERTER.convert(bigIntegerBasicType);
+        Assert.assertEquals(BasicTypeInfo.BIG_INT_TYPE_INFO, bigIntegerTypeInformation);
+    }
+
+    @Test
+    public void convertBigDecimalType() {
+        BasicType<BigDecimal> bigDecimalBasicType = BasicType.BIG_DECIMAL;
+        TypeInformation<BigDecimal> bigDecimalTypeInformation =
+            BasicTypeConverter.BIG_DECIMAL.convert(bigDecimalBasicType);
+        Assert.assertEquals(BasicTypeInfo.BIG_DEC_TYPE_INFO, bigDecimalTypeInformation);
+    }
+
+    @Test
+    public void convertInstantType() {
+        BasicType<Instant> instantBasicType = BasicType.INSTANT;
+        TypeInformation<Instant> instantTypeInformation =
+            BasicTypeConverter.INSTANT_CONVERTER.convert(instantBasicType);
+        Assert.assertEquals(BasicTypeInfo.INSTANT_TYPE_INFO, instantTypeInformation);
+    }
+
+    @Test
+    public void convertNullType() {
+        BasicType<Void> nullBasicType = BasicType.NULL;
+        TypeInformation<Void> nullTypeInformation =
+            BasicTypeConverter.NULL_CONVERTER.convert(nullBasicType);
+        Assert.assertEquals(BasicTypeInfo.VOID_TYPE_INFO, nullTypeInformation);
+    }
+}
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/test/java/org/apache/seatunnel/translation/flink/types/PojoTypeConverterTest.java b/seatunnel-translation/seatunnel-translation-flink/src/test/java/org/apache/seatunnel/translation/flink/types/PojoTypeConverterTest.java
new file mode 100644
index 00000000..778011c9
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/src/test/java/org/apache/seatunnel/translation/flink/types/PojoTypeConverterTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.types;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DataType;
+import org.apache.seatunnel.api.table.type.ListType;
+import org.apache.seatunnel.api.table.type.PojoType;
+
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.List;
+
+public class PojoTypeConverterTest {
+
+    @Test
+    public void convert() {
+        PojoTypeConverter<MockPojo> pojoTypeConverter = new PojoTypeConverter<>();
+        Field[] fields = MockPojo.class.getDeclaredFields();
+        DataType<?>[] fieldTypes = {BasicType.STRING, new ListType<>(BasicType.INTEGER)};
+        PojoTypeInfo<MockPojo> pojoTypeInfo =
+            pojoTypeConverter.convert(new PojoType<>(MockPojo.class, fields, fieldTypes));
+        Assert.assertEquals(
+            TypeExtractor.createTypeInfo(MockPojo.class),
+            pojoTypeInfo);
+    }
+
+    public static class MockPojo {
+        private String stringField;
+        private List<Integer> listField;
+
+        public String getStringField() {
+            return stringField;
+        }
+
+        public void setStringField(String stringField) {
+            this.stringField = stringField;
+        }
+
+        public List<Integer> getListField() {
+            return listField;
+        }
+
+        public void setListField(List<Integer> listField) {
+            this.listField = listField;
+        }
+    }
+}