You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/05/05 18:05:04 UTC
[flink] branch release-1.10 updated: [FLINK-17313] Fix type
validation error when sink table ddl contains columns with precision of
decimal/varchar (#11993)
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push:
new ff7b4f6 [FLINK-17313] Fix type validation error when sink table ddl contains columns with precision of decimal/varchar (#11993)
ff7b4f6 is described below
commit ff7b4f6f0e073477b7a93dbda8243e7fc8647f50
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Tue May 5 20:04:20 2020 +0200
[FLINK-17313] Fix type validation error when sink table ddl contains columns with precision of decimal/varchar (#11993)
Co-authored-by: Terry Wang <zj...@foxmail.com>
---
.../apache/flink/table/utils/TypeMappingUtils.java | 115 ++++++++++++++++-----
.../flink/table/utils/TypeMappingUtilsTest.java | 68 ++++++++++++
2 files changed, 159 insertions(+), 24 deletions(-)
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TypeMappingUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TypeMappingUtils.java
index 8239100..e284787 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TypeMappingUtils.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TypeMappingUtils.java
@@ -32,6 +32,8 @@ import org.apache.flink.table.types.logical.LegacyTypeInformationType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeFamily;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
import org.apache.flink.table.types.utils.DataTypeUtils;
@@ -46,7 +48,9 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getLength;
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasFamily;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot;
/**
* Utility methods for dealing with field types in {@link org.apache.flink.table.sources.TableSource}
@@ -157,19 +161,35 @@ public final class TypeMappingUtils {
String physicalFieldName,
String logicalFieldName,
boolean isSource) {
- checkIfCompatible(
- physicalFieldType,
- logicalFieldType,
- (cause) -> new ValidationException(
+ Function<Throwable, ValidationException> exceptionSupplier = (cause) ->
+ new ValidationException(
String.format(
"Type %s of table field '%s' does not match with " +
- "the physical type %s of the '%s' field of the %s.",
+ "the physical type %s of the '%s' field of the %s type.",
logicalFieldType,
logicalFieldName,
physicalFieldType,
physicalFieldName,
- isSource ? "TableSource return type" : "TableSink consumed type"),
- cause));
+ isSource ? "TableSource return" : "TableSink consumed"),
+ cause);
+ try {
+ final boolean typesCompatible;
+ if (isSource) {
+ typesCompatible = checkIfCompatible(
+ physicalFieldType,
+ logicalFieldType);
+ } else {
+ typesCompatible = checkIfCompatible(
+ logicalFieldType,
+ physicalFieldType);
+ }
+
+ if (!typesCompatible) {
+ throw exceptionSupplier.apply(null);
+ }
+ } catch (Exception e) {
+ throw exceptionSupplier.apply(e);
+ }
}
private static void verifyTimeAttributeType(TableColumn logicalColumn, String rowtimeOrProctime) {
@@ -243,38 +263,85 @@ public final class TypeMappingUtils {
);
}
- private static void checkIfCompatible(
- LogicalType physicalFieldType,
- LogicalType logicalFieldType,
- Function<Throwable, ValidationException> exceptionSupplier) {
- if (LogicalTypeChecks.areTypesCompatible(physicalFieldType, logicalFieldType)) {
- return;
+ private static boolean checkIfCompatible(
+ LogicalType sourceType,
+ LogicalType targetType) {
+ if (LogicalTypeChecks.areTypesCompatible(sourceType, targetType)) {
+ return true;
+ }
+
+ Boolean targetTypeCompatible = targetType.accept(new LogicalTypeDefaultVisitor<Boolean>() {
+
+ @Override
+ public Boolean visit(VarCharType targetType) {
+ if (sourceType.isNullable() && !targetType.isNullable()) {
+ return false;
+ }
+ // CHAR and VARCHAR are very compatible within bounds
+ if ((hasRoot(sourceType, LogicalTypeRoot.CHAR) || hasRoot(sourceType, LogicalTypeRoot.VARCHAR)) &&
+ getLength(sourceType) <= targetType.getLength()) {
+ return true;
+ }
+ return defaultMethod(targetType);
+ }
+
+ @Override
+ public Boolean visit(VarBinaryType targetType) {
+ if (sourceType.isNullable() && !targetType.isNullable()) {
+ return false;
+ }
+ // BINARY and VARBINARY are very compatible within bounds
+ if ((hasRoot(sourceType, LogicalTypeRoot.BINARY) || hasRoot(sourceType, LogicalTypeRoot.VARBINARY)) &&
+ getLength(sourceType) <= targetType.getLength()) {
+ return true;
+ }
+ return defaultMethod(targetType);
+ }
+
+ @Override
+ protected Boolean defaultMethod(LogicalType logicalType) {
+ return false;
+ }
+ });
+
+ if (targetTypeCompatible) {
+ return true;
}
- physicalFieldType.accept(new LogicalTypeDefaultVisitor<Void>() {
+ return sourceType.accept(new LogicalTypeDefaultVisitor<Boolean>() {
+ @Override
+ public Boolean visit(DecimalType sourceType1) {
+ //When targetType is a legacy decimal type, pass the check.
+ if (targetType instanceof LegacyTypeInformationType
+ && targetType.getTypeRoot() == LogicalTypeRoot.DECIMAL) {
+ return true;
+ }
+ return defaultMethod(sourceType1);
+ }
+
@Override
- public Void visit(LogicalType other) {
+ public Boolean visit(LogicalType other) {
if (other instanceof LegacyTypeInformationType && other.getTypeRoot() == LogicalTypeRoot.DECIMAL) {
- if (!(logicalFieldType instanceof DecimalType)) {
- throw exceptionSupplier.apply(null);
+ if (!(targetType instanceof DecimalType)) {
+ return false;
}
- DecimalType logicalDecimalType = (DecimalType) logicalFieldType;
+ DecimalType logicalDecimalType = (DecimalType) targetType;
if (logicalDecimalType.getPrecision() != DecimalType.MAX_PRECISION ||
- logicalDecimalType.getScale() != 18) {
- throw exceptionSupplier.apply(new ValidationException(
- "Legacy decimal type can only be mapped to DECIMAL(38, 18)."));
+ logicalDecimalType.getScale() != 18) {
+ throw new ValidationException(
+ "Legacy decimal type can only be mapped to DECIMAL(38, 18).");
}
- return null;
+ return true;
}
return defaultMethod(other);
}
@Override
- protected Void defaultMethod(LogicalType logicalType) {
- throw exceptionSupplier.apply(null);
+ protected Boolean defaultMethod(LogicalType logicalType) {
+ return false;
}
});
}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TypeMappingUtilsTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TypeMappingUtilsTest.java
index daadd66..ab15581 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TypeMappingUtilsTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TypeMappingUtilsTest.java
@@ -18,16 +18,23 @@
package org.apache.flink.table.utils;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.DefinedProctimeAttribute;
import org.apache.flink.table.sources.DefinedRowtimeAttributes;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LegacyTypeInformationType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.types.Row;
import org.junit.Rule;
import org.junit.Test;
@@ -331,6 +338,30 @@ public class TypeMappingUtilsTest {
);
}
+ @Test
+ public void testCheckPhysicalLogicalTypeCompatible() {
+ TableSchema tableSchema = TableSchema.builder()
+ .field("a", DataTypes.VARCHAR(2))
+ .field("b", DataTypes.DECIMAL(20, 2))
+ .build();
+ TableSink<Tuple2<Boolean, Row>> tableSink = new TestTableSink(tableSchema);
+ LegacyTypeInformationType<?> legacyDataType = (LegacyTypeInformationType<?>) tableSink.getConsumedDataType()
+ .getLogicalType();
+ TypeInformation<?> legacyTypeInfo = ((TupleTypeInfo<?>) legacyDataType.getTypeInformation()).getTypeAt(1);
+ DataType physicalType = TypeConversions.fromLegacyInfoToDataType(legacyTypeInfo);
+ TableSchema physicSchema = DataTypeUtils.expandCompositeTypeToSchema(physicalType);
+ DataType[] logicalDataTypes = tableSchema.getFieldDataTypes();
+ DataType[] physicalDataTypes = physicSchema.getFieldDataTypes();
+ for (int i = 0; i < logicalDataTypes.length; i++) {
+ TypeMappingUtils.checkPhysicalLogicalTypeCompatible(
+ physicalDataTypes[i].getLogicalType(),
+ logicalDataTypes[i].getLogicalType(),
+ "physicalField",
+ "logicalField",
+ false);
+ }
+ }
+
private static class TestTableSource
implements TableSource<Object>, DefinedProctimeAttribute, DefinedRowtimeAttributes {
@@ -370,4 +401,41 @@ public class TypeMappingUtilsTest {
throw new UnsupportedOperationException("Should not be called");
}
}
+
+ /**
+ * Since UpsertStreamTableSink not in flink-table-common module, here we use Tuple2 <Boolean, Row> to
+ * simulate the behavior of UpsertStreamTableSink.
+ */
+ private static class TestTableSink implements TableSink<Tuple2<Boolean, Row>> {
+ private final TableSchema tableSchema;
+
+ private TestTableSink(TableSchema tableSchema) {
+ this.tableSchema = tableSchema;
+ }
+
+ TypeInformation<Row> getRecordType() {
+ return tableSchema.toRowType();
+ }
+
+ @Override
+ public TypeInformation<Tuple2<Boolean, Row>> getOutputType() {
+ return new TupleTypeInfo<>(Types.BOOLEAN, getRecordType());
+ }
+
+ @Override
+ public String[] getFieldNames() {
+ return tableSchema.getFieldNames();
+ }
+
+ @Override
+ public TypeInformation<?>[] getFieldTypes() {
+ return tableSchema.getFieldTypes();
+ }
+
+ @Override
+ public TableSink<Tuple2<Boolean, Row>> configure(
+ String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+ return null;
+ }
+ }
}