You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/05/05 18:05:04 UTC

[flink] branch release-1.10 updated: [FLINK-17313] Fix type validation error when sink table ddl contains columns with precision of decimal/varchar (#11993)

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new ff7b4f6  [FLINK-17313] Fix type validation error when sink table ddl contains columns with precision of decimal/varchar (#11993)
ff7b4f6 is described below

commit ff7b4f6f0e073477b7a93dbda8243e7fc8647f50
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Tue May 5 20:04:20 2020 +0200

    [FLINK-17313] Fix type validation error when sink table ddl contains columns with precision of decimal/varchar (#11993)
    
    Co-authored-by: Terry Wang <zj...@foxmail.com>
---
 .../apache/flink/table/utils/TypeMappingUtils.java | 115 ++++++++++++++++-----
 .../flink/table/utils/TypeMappingUtilsTest.java    |  68 ++++++++++++
 2 files changed, 159 insertions(+), 24 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TypeMappingUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TypeMappingUtils.java
index 8239100..e284787 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TypeMappingUtils.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TypeMappingUtils.java
@@ -32,6 +32,8 @@ import org.apache.flink.table.types.logical.LegacyTypeInformationType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
 import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
 import org.apache.flink.table.types.utils.DataTypeUtils;
@@ -46,7 +48,9 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getLength;
 import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasFamily;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot;
 
 /**
  * Utility methods for dealing with field types in {@link org.apache.flink.table.sources.TableSource}
@@ -157,19 +161,35 @@ public final class TypeMappingUtils {
 			String physicalFieldName,
 			String logicalFieldName,
 			boolean isSource) {
-		checkIfCompatible(
-			physicalFieldType,
-			logicalFieldType,
-			(cause) -> new ValidationException(
+		Function<Throwable, ValidationException> exceptionSupplier = (cause) ->
+			new ValidationException(
 				String.format(
 					"Type %s of table field '%s' does not match with " +
-						"the physical type %s of the '%s' field of the %s.",
+						"the physical type %s of the '%s' field of the %s type.",
 					logicalFieldType,
 					logicalFieldName,
 					physicalFieldType,
 					physicalFieldName,
-					isSource ? "TableSource return type" : "TableSink consumed type"),
-				cause));
+					isSource ? "TableSource return" : "TableSink consumed"),
+				cause);
+		try {
+			final boolean typesCompatible;
+			if (isSource) {
+				typesCompatible = checkIfCompatible(
+					physicalFieldType,
+					logicalFieldType);
+			} else {
+				typesCompatible = checkIfCompatible(
+					logicalFieldType,
+					physicalFieldType);
+			}
+
+			if (!typesCompatible) {
+				throw exceptionSupplier.apply(null);
+			}
+		} catch (Exception e) {
+			throw exceptionSupplier.apply(e);
+		}
 	}
 
 	private static void verifyTimeAttributeType(TableColumn logicalColumn, String rowtimeOrProctime) {
@@ -243,38 +263,85 @@ public final class TypeMappingUtils {
 		);
 	}
 
-	private static void checkIfCompatible(
-			LogicalType physicalFieldType,
-			LogicalType logicalFieldType,
-			Function<Throwable, ValidationException> exceptionSupplier) {
-		if (LogicalTypeChecks.areTypesCompatible(physicalFieldType, logicalFieldType)) {
-			return;
+	private static boolean checkIfCompatible(
+			LogicalType sourceType,
+			LogicalType targetType) {
+		if (LogicalTypeChecks.areTypesCompatible(sourceType, targetType)) {
+			return true;
+		}
+
+		Boolean targetTypeCompatible = targetType.accept(new LogicalTypeDefaultVisitor<Boolean>() {
+
+			@Override
+			public Boolean visit(VarCharType targetType) {
+				if (sourceType.isNullable() && !targetType.isNullable()) {
+					return false;
+				}
+				// CHAR and VARCHAR are very compatible within bounds
+				if ((hasRoot(sourceType, LogicalTypeRoot.CHAR) || hasRoot(sourceType, LogicalTypeRoot.VARCHAR)) &&
+					getLength(sourceType) <= targetType.getLength()) {
+					return true;
+				}
+				return defaultMethod(targetType);
+			}
+
+			@Override
+			public Boolean visit(VarBinaryType targetType) {
+				if (sourceType.isNullable() && !targetType.isNullable()) {
+					return false;
+				}
+				// BINARY and VARBINARY are very compatible within bounds
+				if ((hasRoot(sourceType, LogicalTypeRoot.BINARY) || hasRoot(sourceType, LogicalTypeRoot.VARBINARY)) &&
+					getLength(sourceType) <= targetType.getLength()) {
+					return true;
+				}
+				return defaultMethod(targetType);
+			}
+
+			@Override
+			protected Boolean defaultMethod(LogicalType logicalType) {
+				return false;
+			}
+		});
+
+		if (targetTypeCompatible) {
+			return true;
 		}
 
-		physicalFieldType.accept(new LogicalTypeDefaultVisitor<Void>() {
+		return sourceType.accept(new LogicalTypeDefaultVisitor<Boolean>() {
+			@Override
+			public Boolean visit(DecimalType sourceType1) {
+				//When targetType is a legacy decimal type, pass the check.
+				if (targetType instanceof LegacyTypeInformationType
+					&& targetType.getTypeRoot() == LogicalTypeRoot.DECIMAL) {
+					return true;
+				}
+				return defaultMethod(sourceType1);
+			}
+
 			@Override
-			public Void visit(LogicalType other) {
+			public Boolean visit(LogicalType other) {
 				if (other instanceof LegacyTypeInformationType && other.getTypeRoot() == LogicalTypeRoot.DECIMAL) {
-					if (!(logicalFieldType instanceof DecimalType)) {
-						throw exceptionSupplier.apply(null);
+					if (!(targetType instanceof DecimalType)) {
+						return false;
 					}
 
-					DecimalType logicalDecimalType = (DecimalType) logicalFieldType;
+					DecimalType logicalDecimalType = (DecimalType) targetType;
 					if (logicalDecimalType.getPrecision() != DecimalType.MAX_PRECISION ||
-							logicalDecimalType.getScale() != 18) {
-						throw exceptionSupplier.apply(new ValidationException(
-							"Legacy decimal type can only be mapped to DECIMAL(38, 18)."));
+						logicalDecimalType.getScale() != 18) {
+						throw new ValidationException(
+							"Legacy decimal type can only be mapped to DECIMAL(38, 18).");
 					}
 
-					return null;
+					return true;
 				}
 
 				return defaultMethod(other);
 			}
 
 			@Override
-			protected Void defaultMethod(LogicalType logicalType) {
-				throw exceptionSupplier.apply(null);
+			protected Boolean defaultMethod(LogicalType logicalType) {
+				return false;
 			}
 		});
 	}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TypeMappingUtilsTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TypeMappingUtilsTest.java
index daadd66..ab15581 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TypeMappingUtilsTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TypeMappingUtilsTest.java
@@ -18,16 +18,23 @@
 
 package org.apache.flink.table.utils;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.sources.DefinedProctimeAttribute;
 import org.apache.flink.table.sources.DefinedRowtimeAttributes;
 import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
 import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LegacyTypeInformationType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
 import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.types.Row;
 
 import org.junit.Rule;
 import org.junit.Test;
@@ -331,6 +338,30 @@ public class TypeMappingUtilsTest {
 		);
 	}
 
+	@Test
+	public void testCheckPhysicalLogicalTypeCompatible() {
+		TableSchema tableSchema = TableSchema.builder()
+								.field("a", DataTypes.VARCHAR(2))
+								.field("b", DataTypes.DECIMAL(20, 2))
+								.build();
+		TableSink<Tuple2<Boolean, Row>> tableSink = new TestTableSink(tableSchema);
+		LegacyTypeInformationType<?> legacyDataType = (LegacyTypeInformationType<?>) tableSink.getConsumedDataType()
+														.getLogicalType();
+		TypeInformation<?> legacyTypeInfo = ((TupleTypeInfo<?>) legacyDataType.getTypeInformation()).getTypeAt(1);
+		DataType physicalType = TypeConversions.fromLegacyInfoToDataType(legacyTypeInfo);
+		TableSchema physicSchema = DataTypeUtils.expandCompositeTypeToSchema(physicalType);
+		DataType[] logicalDataTypes = tableSchema.getFieldDataTypes();
+		DataType[] physicalDataTypes = physicSchema.getFieldDataTypes();
+		for (int i = 0; i < logicalDataTypes.length; i++) {
+			TypeMappingUtils.checkPhysicalLogicalTypeCompatible(
+					physicalDataTypes[i].getLogicalType(),
+					logicalDataTypes[i].getLogicalType(),
+					"physicalField",
+					"logicalField",
+					false);
+		}
+	}
+
 	private static class TestTableSource
 		implements TableSource<Object>, DefinedProctimeAttribute, DefinedRowtimeAttributes {
 
@@ -370,4 +401,41 @@ public class TypeMappingUtilsTest {
 			throw new UnsupportedOperationException("Should not be called");
 		}
 	}
+
+	/**
+	 * Since UpsertStreamTableSink not in flink-table-common module, here we use Tuple2 &lt;Boolean, Row&gt; to
+	 * simulate the behavior of UpsertStreamTableSink.
+	 */
+	private static class TestTableSink implements TableSink<Tuple2<Boolean, Row>> {
+		private final TableSchema tableSchema;
+
+		private TestTableSink(TableSchema tableSchema) {
+			this.tableSchema = tableSchema;
+		}
+
+		TypeInformation<Row> getRecordType() {
+			return tableSchema.toRowType();
+		}
+
+		@Override
+		public TypeInformation<Tuple2<Boolean, Row>> getOutputType() {
+			return new TupleTypeInfo<>(Types.BOOLEAN, getRecordType());
+		}
+
+		@Override
+		public String[] getFieldNames() {
+			return tableSchema.getFieldNames();
+		}
+
+		@Override
+		public TypeInformation<?>[] getFieldTypes() {
+			return tableSchema.getFieldTypes();
+		}
+
+		@Override
+		public TableSink<Tuple2<Boolean, Row>> configure(
+				String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+			return null;
+		}
+	}
 }