You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2020/06/25 08:49:08 UTC
[flink] branch master updated: [FLINK-18417][table] Support List as
a conversion class for ARRAY
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 6834ed1 [FLINK-18417][table] Support List as a conversion class for ARRAY
6834ed1 is described below
commit 6834ed181aa9edda6b9c7fb61696de93170a88d1
Author: Timo Walther <tw...@apache.org>
AuthorDate: Tue Jun 23 17:16:35 2020 +0200
[FLINK-18417][table] Support List as a conversion class for ARRAY
This closes #12765.
---
docs/dev/table/types.md | 2 +
docs/dev/table/types.zh.md | 2 +
.../table/types/extraction/DataTypeExtractor.java | 40 ++++++++--
.../flink/table/types/logical/ArrayType.java | 4 +
.../apache/flink/table/types/LogicalTypesTest.java | 6 +-
.../types/extraction/DataTypeExtractorTest.java | 16 ++++
.../table/data/conversion/ArrayListConverter.java | 86 ++++++++++++++++++++++
.../data/conversion/ArrayObjectArrayConverter.java | 4 +-
.../data/conversion/DataStructureConverters.java | 8 ++
.../table/data/DataStructureConvertersTest.java | 55 ++++++++++++--
10 files changed, 207 insertions(+), 16 deletions(-)
diff --git a/docs/dev/table/types.md b/docs/dev/table/types.md
index 16991d6..698deca 100644
--- a/docs/dev/table/types.md
+++ b/docs/dev/table/types.md
@@ -1007,6 +1007,8 @@ equivalent to `ARRAY<INT>`.
| Java Type | Input | Output | Remarks |
|:---------------------------------------|:-----:|:------:|:----------------------------------|
|*t*`[]` | (X) | (X) | Depends on the subtype. *Default* |
+| `java.util.List<t>` | X | X | |
+| *subclass* of `java.util.List<t>` | X | | |
|`org.apache.flink.table.data.ArrayData` | X | X | Internal data structure. |
#### `MAP`
diff --git a/docs/dev/table/types.zh.md b/docs/dev/table/types.zh.md
index 252ff02..582bcd4 100644
--- a/docs/dev/table/types.zh.md
+++ b/docs/dev/table/types.zh.md
@@ -925,6 +925,8 @@ DataTypes.ARRAY(t)
| Java 类型 | 输入 | 输出 | 备注 |
|:----------|:-----:|:------:|:----------------------------------|
|*t*`[]` | (X) | (X) | 依赖于子类型。 *缺省* |
+|`java.util.List<t>` | X | X | |
+| *subclass* of `java.util.List<t>` | X | | |
#### `MAP`
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java
index ff353fd..3b20183 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java
@@ -314,15 +314,35 @@ public final class DataTypeExtractor {
return DataTypes.ARRAY(
extractDataTypeOrRaw(template, typeHierarchy, genericArray.getGenericComponentType()));
}
+
+ final Class<?> clazz = toClass(type);
+ if (clazz == null) {
+ return null;
+ }
+
// for my.custom.Pojo[][]
- else if (type instanceof Class) {
- final Class<?> clazz = (Class<?>) type;
- if (clazz.isArray()) {
- return DataTypes.ARRAY(
- extractDataTypeOrRaw(template, typeHierarchy, clazz.getComponentType()));
- }
+ if (clazz.isArray()) {
+ return DataTypes.ARRAY(
+ extractDataTypeOrRaw(template, typeHierarchy, clazz.getComponentType()));
}
- return null;
+
+ // for List<T>
+ // we only allow List here (not a subclass) because we cannot guarantee more specific
+ // data structures after conversion
+ if (clazz != List.class) {
+ return null;
+ }
+ if (!(type instanceof ParameterizedType)) {
+ throw extractionError(
+ "The class '%s' needs generic parameters for an array type.",
+ List.class.getName());
+ }
+ final ParameterizedType parameterizedType = (ParameterizedType) type;
+ final DataType element = extractDataTypeOrRaw(
+ template,
+ typeHierarchy,
+ parameterizedType.getActualTypeArguments()[0]);
+ return DataTypes.ARRAY(element).bridgedTo(List.class);
}
private @Nullable DataType extractEnforcedRawType(DataTypeTemplate template, Type type) {
@@ -429,11 +449,15 @@ public final class DataTypeExtractor {
private @Nullable DataType extractMapType(DataTypeTemplate template, List<Type> typeHierarchy, Type type) {
final Class<?> clazz = toClass(type);
+ // we only allow Map here (not a subclass) because we cannot guarantee more specific
+ // data structures after conversion
if (clazz != Map.class) {
return null;
}
if (!(type instanceof ParameterizedType)) {
- throw extractionError("Raw map type needs generic parameters.");
+ throw extractionError(
+ "The class '%s' needs generic parameters for a map type.",
+ Map.class.getName());
}
final ParameterizedType parameterizedType = (ParameterizedType) type;
final DataType key = extractDataTypeOrRaw(
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/ArrayType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/ArrayType.java
index f563863..2d87279 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/ArrayType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/ArrayType.java
@@ -42,6 +42,7 @@ public final class ArrayType extends LogicalType {
public static final String FORMAT = "ARRAY<%s>";
private static final Set<String> INPUT_OUTPUT_CONVERSION = conversionSet(
+ List.class.getName(),
ArrayData.class.getName());
private final LogicalType elementType;
@@ -76,6 +77,9 @@ public final class ArrayType extends LogicalType {
@Override
public boolean supportsInputConversion(Class<?> clazz) {
+ if (List.class.isAssignableFrom(clazz)) {
+ return true;
+ }
if (INPUT_OUTPUT_CONVERSION.contains(clazz.getName())) {
return true;
}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
index eac4014..85395b9 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
@@ -68,9 +68,11 @@ import org.junit.Test;
import java.math.BigDecimal;
import java.time.LocalDateTime;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@@ -405,8 +407,8 @@ public class LogicalTypesTest {
new ArrayType(new TimestampType()),
"ARRAY<TIMESTAMP(6)>",
"ARRAY<TIMESTAMP(6)>",
- new Class[]{java.sql.Timestamp[].class, java.time.LocalDateTime[].class},
- new Class[]{java.sql.Timestamp[].class, java.time.LocalDateTime[].class},
+ new Class[]{java.sql.Timestamp[].class, java.time.LocalDateTime[].class, List.class, ArrayList.class},
+ new Class[]{java.sql.Timestamp[].class, java.time.LocalDateTime[].class, List.class},
new LogicalType[]{new TimestampType()},
new ArrayType(new SmallIntType())
);
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java
index 2d60a76..87322c6 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java
@@ -305,6 +305,16 @@ public class DataTypeExtractorTest {
"Could not extract a data type from 'java.util.HashMap<java.lang.Integer, java.lang.String>'. " +
"Interpreting it as a structured type was also not successful."),
+ TestSpec
+ .forGeneric(
+ "ARRAY type with List conversion class",
+ TableFunction.class, 0, TableFunctionWithList.class)
+ .expectDataType(
+ DataTypes.ARRAY(
+ DataTypes.ARRAY(DataTypes.STRING()).bridgedTo(List.class)
+ ).bridgedTo(List.class)
+ ),
+
// simple structured type without RAW type
TestSpec
.forType(SimplePojo.class)
@@ -744,6 +754,12 @@ public class DataTypeExtractorTest {
// --------------------------------------------------------------------------------------------
+ private static class TableFunctionWithList extends TableFunction<List<List<String>>> {
+
+ }
+
+ // --------------------------------------------------------------------------------------------
+
/**
* Complex POJO with raw types.
*/
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayListConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayListConverter.java
new file mode 100644
index 0000000..8acd078
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayListConverter.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.ArrayType;
+
+import java.lang.reflect.Array;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.commons.lang3.ClassUtils.primitiveToWrapper;
+
+/**
+ * Converter for {@link ArrayType} of {@link List} external type.
+ */
+@Internal
+public class ArrayListConverter<E> implements DataStructureConverter<ArrayData, List<E>> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final E[] arrayKind;
+
+ private final ArrayObjectArrayConverter<E> elementsConverter;
+
+ private ArrayListConverter(E[] arrayKind, ArrayObjectArrayConverter<E> elementsConverter) {
+ this.arrayKind = arrayKind;
+ this.elementsConverter = elementsConverter;
+ }
+
+ @Override
+ public void open(ClassLoader classLoader) {
+ elementsConverter.open(classLoader);
+ }
+
+ @Override
+ public ArrayData toInternal(List<E> external) {
+ return elementsConverter.toInternal(external.toArray(arrayKind));
+ }
+
+ @Override
+ public List<E> toExternal(ArrayData internal) {
+ return Arrays.asList(elementsConverter.toExternal(internal));
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Factory method
+ // --------------------------------------------------------------------------------------------
+
+ public static ArrayListConverter<?> create(DataType dataType) {
+ final DataType elementDataType = dataType.getChildren().get(0);
+ return new ArrayListConverter<>(
+ createObjectArrayKind(elementDataType.getConversionClass()),
+ ArrayObjectArrayConverter.createForElement(elementDataType));
+ }
+
+ /**
+ * Creates the kind of array for {@link List#toArray(Object[])}.
+ */
+ private static Object[] createObjectArrayKind(Class<?> elementClazz) {
+ // e.g. int[] is not a Object[]
+ if (elementClazz.isPrimitive()) {
+ return (Object[]) Array.newInstance(primitiveToWrapper(elementClazz), 0);
+ }
+ // e.g. int[][] and Integer[] are Object[]
+ return (Object[]) Array.newInstance(elementClazz, 0);
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java
index c191c55..c8c7192 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java
@@ -34,6 +34,8 @@ import org.apache.commons.lang3.ArrayUtils;
import java.io.Serializable;
import java.lang.reflect.Array;
+import static org.apache.commons.lang3.ClassUtils.primitiveToWrapper;
+
/**
* Converter for {@link ArrayType} of nested primitive or object arrays external types.
*/
@@ -168,7 +170,7 @@ public class ArrayObjectArrayConverter<E> implements DataStructureConverter<Arra
public static <E> ArrayObjectArrayConverter<E> createForElement(DataType elementDataType) {
final LogicalType elementType = elementDataType.getLogicalType();
return new ArrayObjectArrayConverter<>(
- (Class<E>) elementDataType.getConversionClass(),
+ (Class<E>) primitiveToWrapper(elementDataType.getConversionClass()),
BinaryArrayData.calculateFixLengthPartSize(elementType),
BinaryArrayWriter.createNullSetter(elementType),
BinaryWriter.createValueSetter(elementType),
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverters.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverters.java
index 2548733..89dcea6 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverters.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverters.java
@@ -34,6 +34,7 @@ import org.apache.flink.types.Row;
import java.math.BigDecimal;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
@@ -146,10 +147,17 @@ public final class DataStructureConverters {
// special cases
switch (logicalType.getTypeRoot()) {
case ARRAY:
+ // for subclasses of List
+ if (List.class.isAssignableFrom(dataType.getConversionClass())) {
+ return ArrayListConverter.create(dataType);
+ }
+ // for non-primitive arrays
return ArrayObjectArrayConverter.create(dataType);
case MULTISET:
+ // for subclasses of Map
return MapMapConverter.createForMultisetType(dataType);
case MAP:
+ // for subclasses of Map
return MapMapConverter.createForMapType(dataType);
case DISTINCT_TYPE:
return getConverterInternal(dataType.getChildren().get(0));
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
index 29710fa..bf423bc 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
@@ -53,8 +53,10 @@ import java.time.Period;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -205,12 +207,14 @@ public class DataStructureConvertersTest {
TestSpec
.forDataType(ARRAY(BOOLEAN()))
.convertedTo(Boolean[].class, new Boolean[]{true, null, true, true})
+ .convertedTo(List.class, Arrays.asList(true, null, true, true))
.convertedTo(ArrayData.class, new GenericArrayData(new Boolean[]{true, null, true, true})),
TestSpec
- .forDataType(ARRAY(INT().notNull()))
+ .forDataType(ARRAY(INT().notNull().bridgedTo(int.class))) // int.class should not have an impact
.convertedTo(int[].class, new int[]{1, 2, 3, 4})
- .convertedTo(Integer[].class, new Integer[]{1, 2, 3, 4}),
+ .convertedTo(Integer[].class, new Integer[]{1, 2, 3, 4})
+ .convertedTo(List.class, new LinkedList<>(Arrays.asList(1, 2, 3, 4))), // test List that is not backed by an array
// arrays of TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE are skipped for simplicity
@@ -218,10 +222,13 @@ public class DataStructureConvertersTest {
.forDataType(ARRAY(DATE()))
.convertedTo(
LocalDate[].class,
- new LocalDate[]{null, LocalDate.parse("2010-11-12"), null, LocalDate.parse("2010-11-12")}),
+ new LocalDate[]{null, LocalDate.parse("2010-11-12"), null, LocalDate.parse("2010-11-12")})
+ .convertedTo(
+ List.class,
+ Arrays.asList(null, LocalDate.parse("2010-11-12"), null, LocalDate.parse("2010-11-12"))),
TestSpec
- .forDataType(MAP(INT(), BOOLEAN()))
+ .forDataType(MAP(INT().bridgedTo(int.class), BOOLEAN())) // int.class should not have an impact
.convertedTo(Map.class, createIdentityMap())
.convertedTo(MapData.class, new GenericMapData(createIdentityMap())),
@@ -350,7 +357,16 @@ public class DataStructureConvertersTest {
new Row[] {
Row.of(null, null),
Row.of(new PojoWithImmutableFields(10, "Bob"), null)
- })
+ }),
+
+ TestSpec
+ .forDataType(DataTypes.of(PojoWithList.class))
+ .convertedTo(
+ PojoWithList.class,
+ new PojoWithList(Arrays.asList(Arrays.asList(1.0, null, 2.0, null), Collections.emptyList(), null)))
+ .convertedTo(
+ Row.class,
+ Row.of(Arrays.asList(Arrays.asList(1.0, null, 2.0, null), Collections.emptyList(), null)))
);
}
@@ -757,4 +773,33 @@ public class DataStructureConvertersTest {
return result;
}
}
+
+ /**
+ * Pojo with {@link List}.
+ */
+ public static class PojoWithList {
+
+ public List<List<Double>> factors;
+
+ public PojoWithList(List<List<Double>> factors) {
+ this.factors = factors;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PojoWithList that = (PojoWithList) o;
+ return Objects.equals(factors, that.factors);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(factors);
+ }
+ }
}