You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2020/07/14 08:36:35 UTC
[flink] 04/04: [hotfix][table-common] Update
DataTypeUtils.transform for structured types
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5aa1d96f0e846f09a883d5a0b414048576e3c001
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon Jul 13 15:47:52 2020 +0200
[hotfix][table-common] Update DataTypeUtils.transform for structured types
---
.../flink/table/types/utils/DataTypeUtils.java | 88 +++++++++++++++++-----
.../types/inference/TypeTransformationsTest.java | 38 ++++++++++
2 files changed, 109 insertions(+), 17 deletions(-)
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
index 29bb6a7..e1bdaf6 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.types.AtomicDataType;
import org.apache.flink.table.types.CollectionDataType;
import org.apache.flink.table.types.DataType;
@@ -39,13 +40,16 @@ import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.MultisetType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.StructuredType.StructuredAttribute;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
import org.apache.flink.util.Preconditions;
import java.util.Arrays;
import java.util.List;
+import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -84,12 +88,17 @@ public final class DataTypeUtils {
}
/**
- * Transforms the given data type (can be nested) to a different data type using the given
- * transformations. The given transformations will be called in order.
+ * Transforms the given data type to a different data type using the given transformations.
+ *
+ * <p>The transformations will be called in the given order. In case of constructed or composite
+ * types, a transformation will be applied transitively to children first.
+ *
+ * <p>Both the {@link DataType#getLogicalType()} and {@link DataType#getConversionClass()} can be
+ * transformed.
*
* @param typeToTransform data type to be transformed.
* @param transformations the transformations to transform data type to another type.
- * @return the new data type,
+ * @return the new data type
*/
public static DataType transform(DataType typeToTransform, TypeTransformation... transformations) {
Preconditions.checkArgument(transformations.length > 0, "transformations should not be empty.");
@@ -183,6 +192,12 @@ public final class DataTypeUtils {
}
}
+ /**
+ * Transforms a {@link DataType}.
+ *
+ * <p>In case of constructed or composite types, a transformation will be applied transitively to
+ * children first.
+ */
private static class DataTypeTransformer implements DataTypeVisitor<DataType> {
private final TypeTransformation transformation;
@@ -198,9 +213,9 @@ public final class DataTypeUtils {
@Override
public DataType visit(CollectionDataType collectionDataType) {
- DataType newElementType = collectionDataType.getElementDataType().accept(this);
- LogicalType logicalType = collectionDataType.getLogicalType();
- LogicalType newLogicalType;
+ final DataType newElementType = collectionDataType.getElementDataType().accept(this);
+ final LogicalType logicalType = collectionDataType.getLogicalType();
+ final LogicalType newLogicalType;
if (logicalType instanceof ArrayType) {
newLogicalType = new ArrayType(
logicalType.isNullable(),
@@ -217,37 +232,60 @@ public final class DataTypeUtils {
@Override
public DataType visit(FieldsDataType fieldsDataType) {
- final List<DataType> newFields = fieldsDataType.getChildren().stream()
+ final List<DataType> newDataTypes = fieldsDataType.getChildren().stream()
.map(dt -> dt.accept(this))
.collect(Collectors.toList());
final LogicalType logicalType = fieldsDataType.getLogicalType();
final LogicalType newLogicalType;
if (logicalType instanceof RowType) {
- final List<RowType.RowField> oldFields = ((RowType) logicalType).getFields();
- final List<RowType.RowField> newRowFields = IntStream.range(0, oldFields.size())
+ final List<RowField> oldFields = ((RowType) logicalType).getFields();
+ final List<RowField> newFields = IntStream.range(0, oldFields.size())
.mapToObj(i ->
- new RowType.RowField(
+ new RowField(
oldFields.get(i).getName(),
- newFields.get(i).getLogicalType(),
+ newDataTypes.get(i).getLogicalType(),
oldFields.get(i).getDescription().orElse(null)))
.collect(Collectors.toList());
newLogicalType = new RowType(
logicalType.isNullable(),
- newRowFields);
+ newFields);
+ } else if (logicalType instanceof StructuredType) {
+ final StructuredType structuredType = (StructuredType) logicalType;
+ if (structuredType.getSuperType().isPresent()) {
+ throw new UnsupportedOperationException("Hierarchies of structured types are not supported yet.");
+ }
+ final List<StructuredAttribute> oldAttributes = structuredType.getAttributes();
+ final List<StructuredAttribute> newAttributes = IntStream.range(0, oldAttributes.size())
+ .mapToObj(i ->
+ new StructuredAttribute(
+ oldAttributes.get(i).getName(),
+ newDataTypes.get(i).getLogicalType(),
+ oldAttributes.get(i).getDescription().orElse(null)))
+ .collect(Collectors.toList());
+
+ final StructuredType.Builder builder = createStructuredBuilder(structuredType);
+ builder.attributes(newAttributes);
+ builder.setNullable(structuredType.isNullable());
+ builder.setFinal(structuredType.isFinal());
+ builder.setInstantiable(structuredType.isInstantiable());
+ builder.comparision(structuredType.getComparision());
+ structuredType.getDescription().ifPresent(builder::description);
+
+ newLogicalType = builder.build();
} else {
throw new UnsupportedOperationException("Unsupported logical type : " + logicalType);
}
- return transformation.transform(new FieldsDataType(newLogicalType, newFields));
+ return transformation.transform(new FieldsDataType(newLogicalType, newDataTypes));
}
@Override
public DataType visit(KeyValueDataType keyValueDataType) {
- DataType newKeyType = keyValueDataType.getKeyDataType().accept(this);
- DataType newValueType = keyValueDataType.getValueDataType().accept(this);
- LogicalType logicalType = keyValueDataType.getLogicalType();
- LogicalType newLogicalType;
+ final DataType newKeyType = keyValueDataType.getKeyDataType().accept(this);
+ final DataType newValueType = keyValueDataType.getValueDataType().accept(this);
+ final LogicalType logicalType = keyValueDataType.getLogicalType();
+ final LogicalType newLogicalType;
if (logicalType instanceof MapType) {
newLogicalType = new MapType(
logicalType.isNullable(),
@@ -258,6 +296,22 @@ public final class DataTypeUtils {
}
return transformation.transform(new KeyValueDataType(newLogicalType, newKeyType, newValueType));
}
+
+ // ----------------------------------------------------------------------------------------
+
+ private StructuredType.Builder createStructuredBuilder(StructuredType structuredType) {
+ final Optional<ObjectIdentifier> identifier = structuredType.getObjectIdentifier();
+ final Optional<Class<?>> implementationClass = structuredType.getImplementationClass();
+ if (identifier.isPresent() && implementationClass.isPresent()) {
+ return StructuredType.newBuilder(identifier.get(), implementationClass.get());
+ } else if (identifier.isPresent()) {
+ return StructuredType.newBuilder(identifier.get());
+ } else if (implementationClass.isPresent()) {
+ return StructuredType.newBuilder(implementationClass.get());
+ } else {
+ throw new IllegalArgumentException("Invalid structured type.");
+ }
+ }
}
private static TableSchema expandCompositeType(FieldsDataType dataType) {
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/TypeTransformationsTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/TypeTransformationsTest.java
index 64c0342..ff33d5e 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/TypeTransformationsTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/TypeTransformationsTest.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.table.types.utils.TypeConversions;
@@ -32,6 +34,7 @@ import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
+import static org.apache.flink.table.types.inference.TypeTransformations.TO_INTERNAL_CLASS;
import static org.apache.flink.table.types.inference.TypeTransformations.legacyDecimalToDefaultDecimal;
import static org.apache.flink.table.types.inference.TypeTransformations.legacyRawToTypeInfoRaw;
import static org.apache.flink.table.types.inference.TypeTransformations.timeToSqlTypes;
@@ -44,6 +47,22 @@ import static org.junit.Assert.assertEquals;
public class TypeTransformationsTest {
@Test
+ public void testToInternal() {
+ DataType dataType = DataTypes.STRUCTURED(
+ SimplePojo.class,
+ DataTypes.FIELD("name", DataTypes.STRING()),
+ DataTypes.FIELD("count", DataTypes.INT().notNull().bridgedTo(int.class)));
+
+ DataType expected = DataTypes.STRUCTURED(
+ SimplePojo.class,
+ DataTypes.FIELD("name", DataTypes.STRING().bridgedTo(StringData.class)),
+ DataTypes.FIELD("count", DataTypes.INT().notNull().bridgedTo(Integer.class)))
+ .bridgedTo(RowData.class);
+
+ assertEquals(expected, DataTypeUtils.transform(dataType, TO_INTERNAL_CLASS));
+ }
+
+ @Test
public void testTimeToSqlTypes() {
DataType dataType = DataTypes.ROW(
DataTypes.FIELD("a", DataTypes.STRING()),
@@ -130,6 +149,8 @@ public class TypeTransformationsTest {
assertEquals(expected, DataTypeUtils.transform(dataType, toNullable()));
}
+ // --------------------------------------------------------------------------------------------
+
private static DataType createLegacyDecimal() {
return TypeConversions.fromLegacyInfoToDataType(Types.BIG_DEC);
}
@@ -137,4 +158,21 @@ public class TypeTransformationsTest {
private static DataType createLegacyRaw() {
return TypeConversions.fromLegacyInfoToDataType(Types.GENERIC(TypeTransformationsTest.class));
}
+
+ // --------------------------------------------------------------------------------------------
+ // Helper classes
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Simple POJO for testing.
+ */
+ public static class SimplePojo {
+ public final String name;
+ public final int count;
+
+ public SimplePojo(String name, int count) {
+ this.name = name;
+ this.count = count;
+ }
+ }
}