You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2020/06/25 08:49:08 UTC

[flink] branch master updated: [FLINK-18417][table] Support List as a conversion class for ARRAY

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 6834ed1  [FLINK-18417][table] Support List as a conversion class for ARRAY
6834ed1 is described below

commit 6834ed181aa9edda6b9c7fb61696de93170a88d1
Author: Timo Walther <tw...@apache.org>
AuthorDate: Tue Jun 23 17:16:35 2020 +0200

    [FLINK-18417][table] Support List as a conversion class for ARRAY
    
    This closes #12765.
---
 docs/dev/table/types.md                            |  2 +
 docs/dev/table/types.zh.md                         |  2 +
 .../table/types/extraction/DataTypeExtractor.java  | 40 ++++++++--
 .../flink/table/types/logical/ArrayType.java       |  4 +
 .../apache/flink/table/types/LogicalTypesTest.java |  6 +-
 .../types/extraction/DataTypeExtractorTest.java    | 16 ++++
 .../table/data/conversion/ArrayListConverter.java  | 86 ++++++++++++++++++++++
 .../data/conversion/ArrayObjectArrayConverter.java |  4 +-
 .../data/conversion/DataStructureConverters.java   |  8 ++
 .../table/data/DataStructureConvertersTest.java    | 55 ++++++++++++--
 10 files changed, 207 insertions(+), 16 deletions(-)

diff --git a/docs/dev/table/types.md b/docs/dev/table/types.md
index 16991d6..698deca 100644
--- a/docs/dev/table/types.md
+++ b/docs/dev/table/types.md
@@ -1007,6 +1007,8 @@ equivalent to `ARRAY<INT>`.
 | Java Type                              | Input | Output | Remarks                           |
 |:---------------------------------------|:-----:|:------:|:----------------------------------|
 |*t*`[]`                                 | (X)   | (X)    | Depends on the subtype. *Default* |
+| `java.util.List<t>`                    | X     | X      |                                   |
+| *subclass* of `java.util.List<t>`      | X     |        |                                   |
 |`org.apache.flink.table.data.ArrayData` | X     | X      | Internal data structure.          |
 
 #### `MAP`
diff --git a/docs/dev/table/types.zh.md b/docs/dev/table/types.zh.md
index 252ff02..582bcd4 100644
--- a/docs/dev/table/types.zh.md
+++ b/docs/dev/table/types.zh.md
@@ -925,6 +925,8 @@ DataTypes.ARRAY(t)
 | Java 类型 | 输入 | 输出 | 备注                           |
 |:----------|:-----:|:------:|:----------------------------------|
 |*t*`[]`    | (X)   | (X)    | 依赖于子类型。 *缺省* |
+|`java.util.List<t>`    | X   | X    |            |
+| *subclass* of `java.util.List<t>`    | X     |        |                  |
 
 #### `MAP`
 
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java
index ff353fd..3b20183 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java
@@ -314,15 +314,35 @@ public final class DataTypeExtractor {
 			return DataTypes.ARRAY(
 				extractDataTypeOrRaw(template, typeHierarchy, genericArray.getGenericComponentType()));
 		}
+
+		final Class<?> clazz = toClass(type);
+		if (clazz == null) {
+			return null;
+		}
+
 		// for my.custom.Pojo[][]
-		else if (type instanceof Class) {
-			final Class<?> clazz = (Class<?>) type;
-			if (clazz.isArray()) {
-				return DataTypes.ARRAY(
-					extractDataTypeOrRaw(template, typeHierarchy, clazz.getComponentType()));
-			}
+		if (clazz.isArray()) {
+			return DataTypes.ARRAY(
+				extractDataTypeOrRaw(template, typeHierarchy, clazz.getComponentType()));
 		}
-		return null;
+
+		// for List<T>
+		// we only allow List here (not a subclass) because we cannot guarantee more specific
+		// data structures after conversion
+		if (clazz != List.class) {
+			return null;
+		}
+		if (!(type instanceof ParameterizedType)) {
+			throw extractionError(
+				"The class '%s' needs generic parameters for an array type.",
+				List.class.getName());
+		}
+		final ParameterizedType parameterizedType = (ParameterizedType) type;
+		final DataType element = extractDataTypeOrRaw(
+			template,
+			typeHierarchy,
+			parameterizedType.getActualTypeArguments()[0]);
+		return DataTypes.ARRAY(element).bridgedTo(List.class);
 	}
 
 	private @Nullable DataType extractEnforcedRawType(DataTypeTemplate template, Type type) {
@@ -429,11 +449,15 @@ public final class DataTypeExtractor {
 
 	private @Nullable DataType extractMapType(DataTypeTemplate template, List<Type> typeHierarchy, Type type) {
 		final Class<?> clazz = toClass(type);
+		// we only allow Map here (not a subclass) because we cannot guarantee more specific
+		// data structures after conversion
 		if (clazz != Map.class) {
 			return null;
 		}
 		if (!(type instanceof ParameterizedType)) {
-			throw extractionError("Raw map type needs generic parameters.");
+			throw extractionError(
+				"The class '%s' needs generic parameters for a map type.",
+				Map.class.getName());
 		}
 		final ParameterizedType parameterizedType = (ParameterizedType) type;
 		final DataType key = extractDataTypeOrRaw(
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/ArrayType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/ArrayType.java
index f563863..2d87279 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/ArrayType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/ArrayType.java
@@ -42,6 +42,7 @@ public final class ArrayType extends LogicalType {
 	public static final String FORMAT = "ARRAY<%s>";
 
 	private static final Set<String> INPUT_OUTPUT_CONVERSION = conversionSet(
+		List.class.getName(),
 		ArrayData.class.getName());
 
 	private final LogicalType elementType;
@@ -76,6 +77,9 @@ public final class ArrayType extends LogicalType {
 
 	@Override
 	public boolean supportsInputConversion(Class<?> clazz) {
+		if (List.class.isAssignableFrom(clazz)) {
+			return true;
+		}
 		if (INPUT_OUTPUT_CONVERSION.contains(clazz.getName())) {
 			return true;
 		}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
index eac4014..85395b9 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
@@ -68,9 +68,11 @@ import org.junit.Test;
 
 import java.math.BigDecimal;
 import java.time.LocalDateTime;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
@@ -405,8 +407,8 @@ public class LogicalTypesTest {
 			new ArrayType(new TimestampType()),
 			"ARRAY<TIMESTAMP(6)>",
 			"ARRAY<TIMESTAMP(6)>",
-			new Class[]{java.sql.Timestamp[].class, java.time.LocalDateTime[].class},
-			new Class[]{java.sql.Timestamp[].class, java.time.LocalDateTime[].class},
+			new Class[]{java.sql.Timestamp[].class, java.time.LocalDateTime[].class, List.class, ArrayList.class},
+			new Class[]{java.sql.Timestamp[].class, java.time.LocalDateTime[].class, List.class},
 			new LogicalType[]{new TimestampType()},
 			new ArrayType(new SmallIntType())
 		);
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java
index 2d60a76..87322c6 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java
@@ -305,6 +305,16 @@ public class DataTypeExtractorTest {
 					"Could not extract a data type from 'java.util.HashMap<java.lang.Integer, java.lang.String>'. " +
 						"Interpreting it as a structured type was also not successful."),
 
+			TestSpec
+				.forGeneric(
+					"ARRAY type with List conversion class",
+					TableFunction.class, 0, TableFunctionWithList.class)
+				.expectDataType(
+					DataTypes.ARRAY(
+						DataTypes.ARRAY(DataTypes.STRING()).bridgedTo(List.class)
+					).bridgedTo(List.class)
+				),
+
 			// simple structured type without RAW type
 			TestSpec
 				.forType(SimplePojo.class)
@@ -744,6 +754,12 @@ public class DataTypeExtractorTest {
 
 	// --------------------------------------------------------------------------------------------
 
+	private static class TableFunctionWithList extends TableFunction<List<List<String>>> {
+
+	}
+
+	// --------------------------------------------------------------------------------------------
+
 	/**
 	 * Complex POJO with raw types.
 	 */
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayListConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayListConverter.java
new file mode 100644
index 0000000..8acd078
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayListConverter.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.ArrayType;
+
+import java.lang.reflect.Array;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.commons.lang3.ClassUtils.primitiveToWrapper;
+
+/**
+ * Converter for {@link ArrayType} of {@link List} external type.
+ */
+@Internal
+public class ArrayListConverter<E> implements DataStructureConverter<ArrayData, List<E>> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final E[] arrayKind;
+
+	private final ArrayObjectArrayConverter<E> elementsConverter;
+
+	private ArrayListConverter(E[] arrayKind, ArrayObjectArrayConverter<E> elementsConverter) {
+		this.arrayKind = arrayKind;
+		this.elementsConverter = elementsConverter;
+	}
+
+	@Override
+	public void open(ClassLoader classLoader) {
+		elementsConverter.open(classLoader);
+	}
+
+	@Override
+	public ArrayData toInternal(List<E> external) {
+		return elementsConverter.toInternal(external.toArray(arrayKind));
+	}
+
+	@Override
+	public List<E> toExternal(ArrayData internal) {
+		return Arrays.asList(elementsConverter.toExternal(internal));
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Factory method
+	// --------------------------------------------------------------------------------------------
+
+	public static ArrayListConverter<?> create(DataType dataType) {
+		final DataType elementDataType = dataType.getChildren().get(0);
+		return new ArrayListConverter<>(
+			createObjectArrayKind(elementDataType.getConversionClass()),
+			ArrayObjectArrayConverter.createForElement(elementDataType));
+	}
+
+	/**
+	 * Creates the kind of array for {@link List#toArray(Object[])}.
+	 */
+	private static Object[] createObjectArrayKind(Class<?> elementClazz) {
+		// e.g. int[] is not a Object[]
+		if (elementClazz.isPrimitive()) {
+			return (Object[]) Array.newInstance(primitiveToWrapper(elementClazz), 0);
+		}
+		// e.g. int[][] and Integer[] are Object[]
+		return (Object[]) Array.newInstance(elementClazz, 0);
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java
index c191c55..c8c7192 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java
@@ -34,6 +34,8 @@ import org.apache.commons.lang3.ArrayUtils;
 import java.io.Serializable;
 import java.lang.reflect.Array;
 
+import static org.apache.commons.lang3.ClassUtils.primitiveToWrapper;
+
 /**
  * Converter for {@link ArrayType} of nested primitive or object arrays external types.
  */
@@ -168,7 +170,7 @@ public class ArrayObjectArrayConverter<E> implements DataStructureConverter<Arra
 	public static <E> ArrayObjectArrayConverter<E> createForElement(DataType elementDataType) {
 		final LogicalType elementType = elementDataType.getLogicalType();
 		return new ArrayObjectArrayConverter<>(
-			(Class<E>) elementDataType.getConversionClass(),
+			(Class<E>) primitiveToWrapper(elementDataType.getConversionClass()),
 			BinaryArrayData.calculateFixLengthPartSize(elementType),
 			BinaryArrayWriter.createNullSetter(elementType),
 			BinaryWriter.createValueSetter(elementType),
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverters.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverters.java
index 2548733..89dcea6 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverters.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverters.java
@@ -34,6 +34,7 @@ import org.apache.flink.types.Row;
 
 import java.math.BigDecimal;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.function.Supplier;
@@ -146,10 +147,17 @@ public final class DataStructureConverters {
 		// special cases
 		switch (logicalType.getTypeRoot()) {
 			case ARRAY:
+				// for subclasses of List
+				if (List.class.isAssignableFrom(dataType.getConversionClass())) {
+					return ArrayListConverter.create(dataType);
+				}
+				// for non-primitive arrays
 				return ArrayObjectArrayConverter.create(dataType);
 			case MULTISET:
+				// for subclasses of Map
 				return MapMapConverter.createForMultisetType(dataType);
 			case MAP:
+				// for subclasses of Map
 				return MapMapConverter.createForMapType(dataType);
 			case DISTINCT_TYPE:
 				return getConverterInternal(dataType.getChildren().get(0));
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
index 29710fa..bf423bc 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
@@ -53,8 +53,10 @@ import java.time.Period;
 import java.time.ZoneId;
 import java.time.ZonedDateTime;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -205,12 +207,14 @@ public class DataStructureConvertersTest {
 			TestSpec
 				.forDataType(ARRAY(BOOLEAN()))
 				.convertedTo(Boolean[].class, new Boolean[]{true, null, true, true})
+				.convertedTo(List.class, Arrays.asList(true, null, true, true))
 				.convertedTo(ArrayData.class, new GenericArrayData(new Boolean[]{true, null, true, true})),
 
 			TestSpec
-				.forDataType(ARRAY(INT().notNull()))
+				.forDataType(ARRAY(INT().notNull().bridgedTo(int.class))) // int.class should not have an impact
 				.convertedTo(int[].class, new int[]{1, 2, 3, 4})
-				.convertedTo(Integer[].class, new Integer[]{1, 2, 3, 4}),
+				.convertedTo(Integer[].class, new Integer[]{1, 2, 3, 4})
+				.convertedTo(List.class, new LinkedList<>(Arrays.asList(1, 2, 3, 4))), // test List that is not backed by an array
 
 			// arrays of TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE are skipped for simplicity
 
@@ -218,10 +222,13 @@ public class DataStructureConvertersTest {
 				.forDataType(ARRAY(DATE()))
 				.convertedTo(
 					LocalDate[].class,
-					new LocalDate[]{null, LocalDate.parse("2010-11-12"), null, LocalDate.parse("2010-11-12")}),
+					new LocalDate[]{null, LocalDate.parse("2010-11-12"), null, LocalDate.parse("2010-11-12")})
+				.convertedTo(
+					List.class,
+					Arrays.asList(null, LocalDate.parse("2010-11-12"), null, LocalDate.parse("2010-11-12"))),
 
 			TestSpec
-				.forDataType(MAP(INT(), BOOLEAN()))
+				.forDataType(MAP(INT().bridgedTo(int.class), BOOLEAN())) // int.class should not have an impact
 				.convertedTo(Map.class, createIdentityMap())
 				.convertedTo(MapData.class, new GenericMapData(createIdentityMap())),
 
@@ -350,7 +357,16 @@ public class DataStructureConvertersTest {
 					new Row[] {
 						Row.of(null, null),
 						Row.of(new PojoWithImmutableFields(10, "Bob"), null)
-					})
+					}),
+
+			TestSpec
+				.forDataType(DataTypes.of(PojoWithList.class))
+				.convertedTo(
+					PojoWithList.class,
+					new PojoWithList(Arrays.asList(Arrays.asList(1.0, null, 2.0, null), Collections.emptyList(), null)))
+				.convertedTo(
+					Row.class,
+					Row.of(Arrays.asList(Arrays.asList(1.0, null, 2.0, null), Collections.emptyList(), null)))
 		);
 	}
 
@@ -757,4 +773,33 @@ public class DataStructureConvertersTest {
 			return result;
 		}
 	}
+
+	/**
+	 * Pojo with {@link List}.
+	 */
+	public static class PojoWithList {
+
+		public List<List<Double>> factors;
+
+		public PojoWithList(List<List<Double>> factors) {
+			this.factors = factors;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			PojoWithList that = (PojoWithList) o;
+			return Objects.equals(factors, that.factors);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(factors);
+		}
+	}
 }