You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2020/07/14 08:36:35 UTC

[flink] 04/04: [hotfix][table-common] Update DataTypeUtils.transform for structured types

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5aa1d96f0e846f09a883d5a0b414048576e3c001
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon Jul 13 15:47:52 2020 +0200

    [hotfix][table-common] Update DataTypeUtils.transform for structured types
---
 .../flink/table/types/utils/DataTypeUtils.java     | 88 +++++++++++++++++-----
 .../types/inference/TypeTransformationsTest.java   | 38 ++++++++++
 2 files changed, 109 insertions(+), 17 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
index 29bb6a7..e1bdaf6 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.types.AtomicDataType;
 import org.apache.flink.table.types.CollectionDataType;
 import org.apache.flink.table.types.DataType;
@@ -39,13 +40,16 @@ import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.MultisetType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
 import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.StructuredType.StructuredAttribute;
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
 import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
 import org.apache.flink.util.Preconditions;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -84,12 +88,17 @@ public final class DataTypeUtils {
 	}
 
 	/**
-	 * Transforms the given data type (can be nested) to a different data type using the given
-	 * transformations. The given transformations will be called in order.
+	 * Transforms the given data type to a different data type using the given transformations.
+	 *
+	 * <p>The transformations will be called in the given order. In case of constructed or composite
+	 * types, a transformation will be applied transitively to children first.
+	 *
+	 * <p>Both the {@link DataType#getLogicalType()} and {@link DataType#getConversionClass()} can be
+	 * transformed.
 	 *
 	 * @param typeToTransform data type to be transformed.
 	 * @param transformations the transformations to transform data type to another type.
-	 * @return the new data type,
+	 * @return the new data type
 	 */
 	public static DataType transform(DataType typeToTransform, TypeTransformation... transformations) {
 		Preconditions.checkArgument(transformations.length > 0, "transformations should not be empty.");
@@ -183,6 +192,12 @@ public final class DataTypeUtils {
 		}
 	}
 
+	/**
+	 * Transforms a {@link DataType}.
+	 *
+	 * <p>In case of constructed or composite types, a transformation will be applied transitively to
+	 * children first.
+	 */
 	private static class DataTypeTransformer implements DataTypeVisitor<DataType> {
 
 		private final TypeTransformation transformation;
@@ -198,9 +213,9 @@ public final class DataTypeUtils {
 
 		@Override
 		public DataType visit(CollectionDataType collectionDataType) {
-			DataType newElementType = collectionDataType.getElementDataType().accept(this);
-			LogicalType logicalType = collectionDataType.getLogicalType();
-			LogicalType newLogicalType;
+			final DataType newElementType = collectionDataType.getElementDataType().accept(this);
+			final LogicalType logicalType = collectionDataType.getLogicalType();
+			final LogicalType newLogicalType;
 			if (logicalType instanceof ArrayType) {
 				newLogicalType = new ArrayType(
 					logicalType.isNullable(),
@@ -217,37 +232,60 @@ public final class DataTypeUtils {
 
 		@Override
 		public DataType visit(FieldsDataType fieldsDataType) {
-			final List<DataType> newFields = fieldsDataType.getChildren().stream()
+			final List<DataType> newDataTypes = fieldsDataType.getChildren().stream()
 				.map(dt -> dt.accept(this))
 				.collect(Collectors.toList());
 
 			final LogicalType logicalType = fieldsDataType.getLogicalType();
 			final LogicalType newLogicalType;
 			if (logicalType instanceof RowType) {
-				final List<RowType.RowField> oldFields = ((RowType) logicalType).getFields();
-				final List<RowType.RowField> newRowFields = IntStream.range(0, oldFields.size())
+				final List<RowField> oldFields = ((RowType) logicalType).getFields();
+				final List<RowField> newFields = IntStream.range(0, oldFields.size())
 					.mapToObj(i ->
-						new RowType.RowField(
+						new RowField(
 							oldFields.get(i).getName(),
-							newFields.get(i).getLogicalType(),
+							newDataTypes.get(i).getLogicalType(),
 							oldFields.get(i).getDescription().orElse(null)))
 					.collect(Collectors.toList());
 
 				newLogicalType = new RowType(
 					logicalType.isNullable(),
-					newRowFields);
+					newFields);
+			} else if (logicalType instanceof StructuredType) {
+				final StructuredType structuredType = (StructuredType) logicalType;
+				if (structuredType.getSuperType().isPresent()) {
+					throw new UnsupportedOperationException("Hierarchies of structured types are not supported yet.");
+				}
+				final List<StructuredAttribute> oldAttributes = structuredType.getAttributes();
+				final List<StructuredAttribute> newAttributes = IntStream.range(0, oldAttributes.size())
+					.mapToObj(i ->
+						new StructuredAttribute(
+							oldAttributes.get(i).getName(),
+							newDataTypes.get(i).getLogicalType(),
+							oldAttributes.get(i).getDescription().orElse(null)))
+					.collect(Collectors.toList());
+
+				final StructuredType.Builder builder = createStructuredBuilder(structuredType);
+				builder.attributes(newAttributes);
+				builder.setNullable(structuredType.isNullable());
+				builder.setFinal(structuredType.isFinal());
+				builder.setInstantiable(structuredType.isInstantiable());
+				builder.comparision(structuredType.getComparision());
+				structuredType.getDescription().ifPresent(builder::description);
+
+				newLogicalType = builder.build();
 			} else {
 				throw new UnsupportedOperationException("Unsupported logical type : " + logicalType);
 			}
-			return transformation.transform(new FieldsDataType(newLogicalType, newFields));
+			return transformation.transform(new FieldsDataType(newLogicalType, newDataTypes));
 		}
 
 		@Override
 		public DataType visit(KeyValueDataType keyValueDataType) {
-			DataType newKeyType = keyValueDataType.getKeyDataType().accept(this);
-			DataType newValueType = keyValueDataType.getValueDataType().accept(this);
-			LogicalType logicalType = keyValueDataType.getLogicalType();
-			LogicalType newLogicalType;
+			final DataType newKeyType = keyValueDataType.getKeyDataType().accept(this);
+			final DataType newValueType = keyValueDataType.getValueDataType().accept(this);
+			final LogicalType logicalType = keyValueDataType.getLogicalType();
+			final LogicalType newLogicalType;
 			if (logicalType instanceof MapType) {
 				newLogicalType = new MapType(
 					logicalType.isNullable(),
@@ -258,6 +296,22 @@ public final class DataTypeUtils {
 			}
 			return transformation.transform(new KeyValueDataType(newLogicalType, newKeyType, newValueType));
 		}
+
+		// ----------------------------------------------------------------------------------------
+
+		private StructuredType.Builder createStructuredBuilder(StructuredType structuredType) {
+			final Optional<ObjectIdentifier> identifier = structuredType.getObjectIdentifier();
+			final Optional<Class<?>> implementationClass = structuredType.getImplementationClass();
+			if (identifier.isPresent() && implementationClass.isPresent()) {
+				return StructuredType.newBuilder(identifier.get(), implementationClass.get());
+			} else if (identifier.isPresent()) {
+				return StructuredType.newBuilder(identifier.get());
+			} else if (implementationClass.isPresent()) {
+				return StructuredType.newBuilder(implementationClass.get());
+			} else {
+				throw new IllegalArgumentException("Invalid structured type.");
+			}
+		}
 	}
 
 	private static TableSchema expandCompositeType(FieldsDataType dataType) {
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/TypeTransformationsTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/TypeTransformationsTest.java
index 64c0342..ff33d5e 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/TypeTransformationsTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/TypeTransformationsTest.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.utils.DataTypeUtils;
 import org.apache.flink.table.types.utils.TypeConversions;
@@ -32,6 +34,7 @@ import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
 
+import static org.apache.flink.table.types.inference.TypeTransformations.TO_INTERNAL_CLASS;
 import static org.apache.flink.table.types.inference.TypeTransformations.legacyDecimalToDefaultDecimal;
 import static org.apache.flink.table.types.inference.TypeTransformations.legacyRawToTypeInfoRaw;
 import static org.apache.flink.table.types.inference.TypeTransformations.timeToSqlTypes;
@@ -44,6 +47,22 @@ import static org.junit.Assert.assertEquals;
 public class TypeTransformationsTest {
 
 	@Test
+	public void testToInternal() {
+		DataType dataType = DataTypes.STRUCTURED(
+			SimplePojo.class,
+			DataTypes.FIELD("name", DataTypes.STRING()),
+			DataTypes.FIELD("count", DataTypes.INT().notNull().bridgedTo(int.class)));
+
+		DataType expected = DataTypes.STRUCTURED(
+				SimplePojo.class,
+				DataTypes.FIELD("name", DataTypes.STRING().bridgedTo(StringData.class)),
+				DataTypes.FIELD("count", DataTypes.INT().notNull().bridgedTo(Integer.class)))
+			.bridgedTo(RowData.class);
+
+		assertEquals(expected, DataTypeUtils.transform(dataType, TO_INTERNAL_CLASS));
+	}
+
+	@Test
 	public void testTimeToSqlTypes() {
 		DataType dataType = DataTypes.ROW(
 			DataTypes.FIELD("a", DataTypes.STRING()),
@@ -130,6 +149,8 @@ public class TypeTransformationsTest {
 		assertEquals(expected, DataTypeUtils.transform(dataType, toNullable()));
 	}
 
+	// --------------------------------------------------------------------------------------------
+
 	private static DataType createLegacyDecimal() {
 		return TypeConversions.fromLegacyInfoToDataType(Types.BIG_DEC);
 	}
@@ -137,4 +158,21 @@ public class TypeTransformationsTest {
 	private static DataType createLegacyRaw() {
 		return TypeConversions.fromLegacyInfoToDataType(Types.GENERIC(TypeTransformationsTest.class));
 	}
+
+	// --------------------------------------------------------------------------------------------
+	// Helper classes
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Simple POJO for testing.
+	 */
+	public static class SimplePojo {
+		public final String name;
+		public final int count;
+
+		public SimplePojo(String name, int count) {
+			this.name = name;
+			this.count = count;
+		}
+	}
 }