You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2020/06/03 15:02:09 UTC

[flink] branch master updated (07133e6 -> 1d4bd2b)

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 07133e6  [FLINK-18051][AZP] Fail Maven setup stage on error
     new b2f8866  [hotfix][table-planner-blink] Prepare ExpressionTestBase for new type system
     new 7dc4193  [hotfix][table-common] Align explicit casting with Calcite's SqlTypeCoercionRule
     new 87d8087  [hotfix][table-common] Add constraint argument type strategy
     new 1d4bd2b  [FLINK-18005][table] Implement type inference for CAST

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../resolver/rules/ResolveCallByArgumentsRule.java |  18 +-
 .../functions/BuiltInFunctionDefinitions.java      |   4 +-
 .../types/inference/ConstantArgumentCount.java     |  15 +-
 .../table/types/inference/InputTypeStrategies.java |  26 +-
 .../table/types/inference/TypeStrategies.java      |  25 +-
 .../strategies/ArrayInputTypeStrategy.java         |   5 +-
 ...ypeStrategy.java => CastInputTypeStrategy.java} |  61 +++--
 .../strategies/ComparableTypeStrategy.java         |   5 +-
 ...gy.java => ConstraintArgumentTypeStrategy.java} |  54 ++--
 .../inference/strategies/MapInputTypeStrategy.java |   5 +-
 .../inference/strategies/NullableTypeStrategy.java |  94 +++++++
 .../types/logical/utils/LogicalTypeCasts.java      |  33 ++-
 .../flink/table/types/LogicalTypeCastsTest.java    |   2 +-
 .../types/inference/InputTypeStrategiesTest.java   |  67 ++++-
 .../inference/InputTypeStrategiesTestBase.java     |  12 +
 .../table/types/inference/TypeStrategiesTest.java  |  51 +++-
 .../types/inference/utils/CallContextMock.java     |   2 +-
 .../expressions/PlannerExpressionConverter.scala   |  12 +-
 .../table/planner/expressions/aggregations.scala   |   2 +-
 .../flink/table/planner/expressions/call.scala     |  13 +-
 .../flink/table/planner/expressions/cast.scala     |  44 ---
 .../flink/table/planner/expressions/time.scala     |   4 +-
 .../planner/expressions/DecimalTypeTest.scala      | 297 ++++++---------------
 .../table/planner/expressions/RowTypeTest.scala    |   2 -
 .../planner/expressions/TemporalTypesTest.scala    | 206 ++++----------
 .../expressions/utils/ExpressionTestBase.scala     | 116 +++++---
 .../table/expressions/TemporalTypesTest.scala      |  20 --
 27 files changed, 576 insertions(+), 619 deletions(-)
 copy flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/{WildcardInputTypeStrategy.java => CastInputTypeStrategy.java} (51%)
 copy flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/{LiteralArgumentTypeStrategy.java => ConstraintArgumentTypeStrategy.java} (55%)
 create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/NullableTypeStrategy.java
 delete mode 100644 flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/cast.scala


[flink] 04/04: [FLINK-18005][table] Implement type inference for CAST

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1d4bd2b2d2c5ba392f1cc801036b63d2fd8c483e
Author: Timo Walther <tw...@apache.org>
AuthorDate: Fri May 29 09:05:01 2020 +0200

    [FLINK-18005][table] Implement type inference for CAST
    
    This closes #12411.
---
 .../resolver/rules/ResolveCallByArgumentsRule.java |  18 +-
 .../functions/BuiltInFunctionDefinitions.java      |   4 +-
 .../types/inference/ConstantArgumentCount.java     |  15 +-
 .../table/types/inference/InputTypeStrategies.java |  15 +-
 .../table/types/inference/TypeStrategies.java      |  25 +-
 .../strategies/ArrayInputTypeStrategy.java         |   5 +-
 ...ypeStrategy.java => CastInputTypeStrategy.java} |  54 ++--
 .../strategies/ComparableTypeStrategy.java         |   5 +-
 .../inference/strategies/MapInputTypeStrategy.java |   5 +-
 .../inference/strategies/NullableTypeStrategy.java |  94 +++++++
 .../types/inference/InputTypeStrategiesTest.java   |  39 ++-
 .../inference/InputTypeStrategiesTestBase.java     |  12 +
 .../table/types/inference/TypeStrategiesTest.java  |  51 +++-
 .../types/inference/utils/CallContextMock.java     |   2 +-
 .../expressions/PlannerExpressionConverter.scala   |  12 +-
 .../table/planner/expressions/aggregations.scala   |   2 +-
 .../flink/table/planner/expressions/call.scala     |  13 +-
 .../flink/table/planner/expressions/cast.scala     |  44 ---
 .../flink/table/planner/expressions/time.scala     |   4 +-
 .../planner/expressions/DecimalTypeTest.scala      | 297 ++++++---------------
 .../table/planner/expressions/RowTypeTest.scala    |   2 -
 .../planner/expressions/TemporalTypesTest.scala    | 206 ++++----------
 .../table/expressions/TemporalTypesTest.scala      |  20 --
 23 files changed, 404 insertions(+), 540 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
index 3526f48..e3cd813 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.delegation.PlannerTypeInferenceUtil;
 import org.apache.flink.table.expressions.CallExpression;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.TypeLiteralExpression;
 import org.apache.flink.table.expressions.UnresolvedCallExpression;
 import org.apache.flink.table.expressions.ValueLiteralExpression;
 import org.apache.flink.table.functions.AggregateFunctionDefinition;
@@ -324,19 +325,34 @@ final class ResolveCallByArgumentsRule implements ResolverRule {
 
 		@Override
 		public boolean isArgumentLiteral(int pos) {
-			return getArgument(pos) instanceof ValueLiteralExpression;
+			final ResolvedExpression arg = getArgument(pos);
+			return arg instanceof ValueLiteralExpression || arg instanceof TypeLiteralExpression;
 		}
 
 		@Override
 		public boolean isArgumentNull(int pos) {
 			Preconditions.checkArgument(isArgumentLiteral(pos), "Argument at position %s is not a literal.", pos);
+			final ResolvedExpression arg = getArgument(pos);
+			// special case for type literals in Table API only
+			if (arg instanceof TypeLiteralExpression) {
+				return false;
+			}
 			final ValueLiteralExpression literal = (ValueLiteralExpression) getArgument(pos);
 			return literal.isNull();
 		}
 
 		@Override
+		@SuppressWarnings("unchecked")
 		public <T> Optional<T> getArgumentValue(int pos, Class<T> clazz) {
 			Preconditions.checkArgument(isArgumentLiteral(pos), "Argument at position %s is not a literal.", pos);
+			final ResolvedExpression arg = getArgument(pos);
+			// special case for type literals in Table API only
+			if (arg instanceof TypeLiteralExpression) {
+				if (!DataType.class.isAssignableFrom(clazz)) {
+					return Optional.empty();
+				}
+				return Optional.of((T) arg.getOutputDataType());
+			}
 			final ValueLiteralExpression literal = (ValueLiteralExpression) getArgument(pos);
 			return literal.getValueAs(clazz);
 		}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index 8a72ecc..667d075 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -40,6 +40,7 @@ import static org.apache.flink.table.functions.FunctionKind.AGGREGATE;
 import static org.apache.flink.table.functions.FunctionKind.OTHER;
 import static org.apache.flink.table.functions.FunctionKind.SCALAR;
 import static org.apache.flink.table.types.inference.InputTypeStrategies.OUTPUT_IF_NULL;
+import static org.apache.flink.table.types.inference.InputTypeStrategies.SPECIFIC_FOR_CAST;
 import static org.apache.flink.table.types.inference.InputTypeStrategies.TWO_EQUALS_COMPARABLE;
 import static org.apache.flink.table.types.inference.InputTypeStrategies.TWO_FULLY_COMPARABLE;
 import static org.apache.flink.table.types.inference.InputTypeStrategies.and;
@@ -919,7 +920,8 @@ public final class BuiltInFunctionDefinitions {
 		new BuiltInFunctionDefinition.Builder()
 			.name("cast")
 			.kind(SCALAR)
-			.outputTypeStrategy(TypeStrategies.MISSING)
+			.inputTypeStrategy(SPECIFIC_FOR_CAST)
+			.outputTypeStrategy(nullable(ConstantArgumentCount.to(0), TypeStrategies.argument(1)))
 			.build();
 	public static final BuiltInFunctionDefinition REINTERPRET_CAST =
 		new BuiltInFunctionDefinition.Builder()
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/ConstantArgumentCount.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/ConstantArgumentCount.java
index 4ea2df7..95072e2 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/ConstantArgumentCount.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/ConstantArgumentCount.java
@@ -27,7 +27,7 @@ import java.util.Optional;
 /**
  * Helper class for {@link ArgumentCount} with constant boundaries.
  *
- * <p>Note: All boundaries of this class are inclusive.
+ * <p>Note: All boundaries of this class are inclusive. All indices are 0-based.
  */
 @Internal
 public final class ConstantArgumentCount implements ArgumentCount {
@@ -43,23 +43,28 @@ public final class ConstantArgumentCount implements ArgumentCount {
 		this.maxCount = maxCount;
 	}
 
-	public static ArgumentCount of(int count) {
+	public static ConstantArgumentCount of(int count) {
 		Preconditions.checkArgument(count >= 0);
 		return new ConstantArgumentCount(count, count);
 	}
 
-	public static ArgumentCount between(int minCount, int maxCount) {
+	public static ConstantArgumentCount between(int minCount, int maxCount) {
 		Preconditions.checkArgument(minCount <= maxCount);
 		Preconditions.checkArgument(minCount >= 0);
 		return new ConstantArgumentCount(minCount, maxCount);
 	}
 
-	public static ArgumentCount from(int minCount) {
+	public static ConstantArgumentCount from(int minCount) {
 		Preconditions.checkArgument(minCount >= 0);
 		return new ConstantArgumentCount(minCount, OPEN_INTERVAL);
 	}
 
-	public static ArgumentCount any() {
+	public static ConstantArgumentCount to(int maxCount) {
+		Preconditions.checkArgument(maxCount >= 0);
+		return new ConstantArgumentCount(0, maxCount);
+	}
+
+	public static ConstantArgumentCount any() {
 		return new ConstantArgumentCount(0, OPEN_INTERVAL);
 	}
 
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java
index 615f392..58d77ac 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java
@@ -19,10 +19,12 @@
 package org.apache.flink.table.types.inference;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.inference.strategies.AndArgumentTypeStrategy;
 import org.apache.flink.table.types.inference.strategies.AnyArgumentTypeStrategy;
 import org.apache.flink.table.types.inference.strategies.ArrayInputTypeStrategy;
+import org.apache.flink.table.types.inference.strategies.CastInputTypeStrategy;
 import org.apache.flink.table.types.inference.strategies.ComparableTypeStrategy;
 import org.apache.flink.table.types.inference.strategies.ConstraintArgumentTypeStrategy;
 import org.apache.flink.table.types.inference.strategies.ExplicitArgumentTypeStrategy;
@@ -149,7 +151,7 @@ public final class InputTypeStrategies {
 	 * arguments.
 	 */
 	public static InputTypeStrategy comparable(
-			ArgumentCount argumentCount,
+			ConstantArgumentCount argumentCount,
 			StructuredComparision requiredComparision) {
 		return new ComparableTypeStrategy(argumentCount, requiredComparision);
 	}
@@ -260,18 +262,23 @@ public final class InputTypeStrategies {
 
 
 	// --------------------------------------------------------------------------------------------
-	// Specific type strategies
+	// Specific input type strategies
 	// --------------------------------------------------------------------------------------------
 
 	/**
-	 * Strategy specific for {@link org.apache.flink.table.functions.BuiltInFunctionDefinitions#ARRAY}.
+	 * Strategy specific for {@link BuiltInFunctionDefinitions#CAST}.
+	 */
+	public static final InputTypeStrategy SPECIFIC_FOR_CAST = new CastInputTypeStrategy();
+
+	/**
+	 * Strategy specific for {@link BuiltInFunctionDefinitions#ARRAY}.
 	 *
 	 * <p>It expects at least one argument. All the arguments must have a common super type.
 	 */
 	public static final InputTypeStrategy SPECIFIC_FOR_ARRAY = new ArrayInputTypeStrategy();
 
 	/**
-	 * Strategy specific for {@link org.apache.flink.table.functions.BuiltInFunctionDefinitions#MAP}.
+	 * Strategy specific for {@link BuiltInFunctionDefinitions#MAP}.
 	 *
 	 * <p>It expects at least two arguments. There must be even number of arguments.
 	 * All the keys and values must have a common super type respectively.
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategies.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategies.java
index a60c793..3844562 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategies.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategies.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.inference.strategies.ExplicitTypeStrategy;
 import org.apache.flink.table.types.inference.strategies.MappingTypeStrategy;
 import org.apache.flink.table.types.inference.strategies.MissingTypeStrategy;
+import org.apache.flink.table.types.inference.strategies.NullableTypeStrategy;
 import org.apache.flink.table.types.inference.strategies.UseArgumentTypeStrategy;
 
 import java.util.List;
@@ -67,25 +68,19 @@ public final class TypeStrategies {
 	}
 
 	/**
+	 * A type strategy that can be used to make a result type nullable if any of the selected
+	 * input arguments is nullable. Otherwise the type will be not null.
+	 */
+	public static TypeStrategy nullable(ConstantArgumentCount includedArgs, TypeStrategy initialStrategy) {
+		return new NullableTypeStrategy(includedArgs, initialStrategy);
+	}
+
+	/**
 	 * A type strategy that can be used to make a result type nullable if any of the
 	 * input arguments is nullable. Otherwise the type will be not null.
 	 */
 	public static TypeStrategy nullable(TypeStrategy initialStrategy) {
-		return callContext -> {
-			Optional<DataType> initialDataType = initialStrategy.inferType(callContext);
-			return initialDataType.map(inferredDataType -> {
-					boolean isNullableArgument = callContext.getArgumentDataTypes()
-						.stream()
-						.anyMatch(dataType -> dataType.getLogicalType().isNullable());
-
-					if (isNullableArgument) {
-						return inferredDataType.nullable();
-					} else {
-						return inferredDataType.notNull();
-					}
-				}
-			);
-		};
+		return nullable(ConstantArgumentCount.any(), initialStrategy);
 	}
 
 	// --------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayInputTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayInputTypeStrategy.java
index 201d7e1..42b61c2 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayInputTypeStrategy.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayInputTypeStrategy.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.types.inference.strategies;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.inference.ArgumentCount;
@@ -36,12 +37,12 @@ import java.util.Optional;
 import java.util.stream.Collectors;
 
 /**
- * {@link InputTypeStrategy} specific for {@link org.apache.flink.table.functions.BuiltInFunctionDefinitions#ARRAY}.
+ * {@link InputTypeStrategy} specific for {@link BuiltInFunctionDefinitions#ARRAY}.
  *
  * <p>It expects at least one argument. All the arguments must have a common super type.
  */
 @Internal
-public class ArrayInputTypeStrategy implements InputTypeStrategy {
+public final class ArrayInputTypeStrategy implements InputTypeStrategy {
 	@Override
 	public ArgumentCount getArgumentCount() {
 		return ConstantArgumentCount.from(1);
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayInputTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CastInputTypeStrategy.java
similarity index 51%
copy from flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayInputTypeStrategy.java
copy to flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CastInputTypeStrategy.java
index 201d7e1..26e8e4c 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayInputTypeStrategy.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CastInputTypeStrategy.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.types.inference.strategies;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.inference.ArgumentCount;
@@ -26,50 +27,59 @@ import org.apache.flink.table.types.inference.CallContext;
 import org.apache.flink.table.types.inference.ConstantArgumentCount;
 import org.apache.flink.table.types.inference.InputTypeStrategy;
 import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.logical.LegacyTypeInformationType;
 import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.utils.LogicalTypeGeneralization;
-import org.apache.flink.table.types.utils.TypeConversions;
 
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
-import java.util.stream.Collectors;
+
+import static org.apache.flink.table.types.logical.utils.LogicalTypeCasts.supportsExplicitCast;
 
 /**
- * {@link InputTypeStrategy} specific for {@link org.apache.flink.table.functions.BuiltInFunctionDefinitions#ARRAY}.
+ * {@link InputTypeStrategy} specific for {@link BuiltInFunctionDefinitions#CAST}.
  *
- * <p>It expects at least one argument. All the arguments must have a common super type.
+ * <p>It expects two arguments where the type of first one must be castable to the type of the second
+ * one. The second one must be a type literal.
  */
 @Internal
-public class ArrayInputTypeStrategy implements InputTypeStrategy {
+public final class CastInputTypeStrategy implements InputTypeStrategy {
+
 	@Override
 	public ArgumentCount getArgumentCount() {
-		return ConstantArgumentCount.from(1);
+		return ConstantArgumentCount.of(2);
 	}
 
 	@Override
-	public Optional<List<DataType>> inferInputTypes(
-		CallContext callContext,
-		boolean throwOnFailure) {
-		List<DataType> argumentDataTypes = callContext.getArgumentDataTypes();
-		if (argumentDataTypes.size() == 0) {
+	public Optional<List<DataType>> inferInputTypes(CallContext callContext, boolean throwOnFailure) {
+		// check for type literal
+		if (!callContext.isArgumentLiteral(1) || !callContext.getArgumentValue(1, DataType.class).isPresent()) {
 			return Optional.empty();
 		}
 
-		Optional<LogicalType> commonType = LogicalTypeGeneralization.findCommonType(
-			argumentDataTypes
-				.stream()
-				.map(DataType::getLogicalType)
-				.collect(Collectors.toList())
-		);
+		final List<DataType> argumentDataTypes = callContext.getArgumentDataTypes();
+		final LogicalType fromType = argumentDataTypes.get(0).getLogicalType();
+		final LogicalType toType = argumentDataTypes.get(1).getLogicalType();
 
-		return commonType.map(type -> Collections.nCopies(
-			argumentDataTypes.size(),
-			TypeConversions.fromLogicalToDataType(type)));
+		// A hack to support legacy types. To be removed when we drop the legacy types.
+		if (fromType instanceof LegacyTypeInformationType) {
+			return Optional.of(argumentDataTypes);
+		}
+		if (!supportsExplicitCast(fromType, toType)) {
+			if (throwOnFailure) {
+				throw callContext.newValidationError(
+					"Unsupported cast from '%s' to '%s'.",
+					fromType,
+					toType);
+			}
+			return Optional.empty();
+		}
+		return Optional.of(argumentDataTypes);
 	}
 
 	@Override
 	public List<Signature> getExpectedSignatures(FunctionDefinition definition) {
-		return Collections.singletonList(Signature.of(Signature.Argument.of("*")));
+		return Collections.singletonList(
+			Signature.of(Signature.Argument.of("<ANY>"), Signature.Argument.of("<TYPE LITERAL>")));
 	}
 }
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ComparableTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ComparableTypeStrategy.java
index ef92224..00795e6 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ComparableTypeStrategy.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ComparableTypeStrategy.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.inference.ArgumentCount;
 import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.ConstantArgumentCount;
 import org.apache.flink.table.types.inference.InputTypeStrategy;
 import org.apache.flink.table.types.inference.Signature;
 import org.apache.flink.table.types.logical.DistinctType;
@@ -54,10 +55,10 @@ import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRo
 @Internal
 public final class ComparableTypeStrategy implements InputTypeStrategy {
 	private final StructuredComparision requiredComparision;
-	private final ArgumentCount argumentCount;
+	private final ConstantArgumentCount argumentCount;
 
 	public ComparableTypeStrategy(
-			ArgumentCount argumentCount,
+			ConstantArgumentCount argumentCount,
 			StructuredComparision requiredComparision) {
 		Preconditions.checkArgument(
 			argumentCount.getMinCount().map(c -> c >= 2).orElse(false),
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/MapInputTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/MapInputTypeStrategy.java
index 1ea0411..7954777 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/MapInputTypeStrategy.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/MapInputTypeStrategy.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.types.inference.strategies;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.inference.ArgumentCount;
@@ -37,13 +38,13 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 /**
- * {@link InputTypeStrategy} specific for {@link org.apache.flink.table.functions.BuiltInFunctionDefinitions#MAP}.
+ * {@link InputTypeStrategy} specific for {@link BuiltInFunctionDefinitions#MAP}.
  *
  * <p>It expects at least two arguments. There must be even number of arguments.
  * All the keys and values must have a common super type respectively.
  */
 @Internal
-public class MapInputTypeStrategy implements InputTypeStrategy {
+public final class MapInputTypeStrategy implements InputTypeStrategy {
 
 	private static final ArgumentCount AT_LEAST_TWO_EVEN = new ArgumentCount() {
 		@Override
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/NullableTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/NullableTypeStrategy.java
new file mode 100644
index 0000000..4862da8
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/NullableTypeStrategy.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.ConstantArgumentCount;
+import org.apache.flink.table.types.inference.TypeStrategy;
+import org.apache.flink.util.Preconditions;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.IntStream;
+
+/**
+ * A type strategy that can be used to make a result type nullable if any of the selected
+ * input arguments is nullable. Otherwise the type will be not null.
+ */
+@Internal
+public final class NullableTypeStrategy implements TypeStrategy {
+
+	private final ConstantArgumentCount includedArguments;
+
+	private final TypeStrategy initialStrategy;
+
+	public NullableTypeStrategy(ConstantArgumentCount includedArguments, TypeStrategy initialStrategy) {
+		this.includedArguments = Preconditions.checkNotNull(includedArguments);
+		this.initialStrategy = Preconditions.checkNotNull(initialStrategy);
+	}
+
+	@Override
+	public Optional<DataType> inferType(CallContext callContext) {
+		return initialStrategy.inferType(callContext)
+			.map(inferredDataType -> {
+				final List<DataType> argumentDataTypes = callContext.getArgumentDataTypes();
+
+				if (argumentDataTypes.isEmpty()) {
+					return inferredDataType.notNull();
+				}
+
+				final int fromArg = includedArguments.getMinCount().orElse(0);
+
+				final int toArg = Math.min(
+					includedArguments.getMaxCount().map(c -> c + 1).orElse(argumentDataTypes.size()),
+					argumentDataTypes.size());
+
+				final boolean isNullableArgument = IntStream.range(fromArg, toArg)
+					.mapToObj(argumentDataTypes::get)
+					.anyMatch(dataType -> dataType.getLogicalType().isNullable());
+
+				if (isNullableArgument) {
+					return inferredDataType.nullable();
+				} else {
+					return inferredDataType.notNull();
+				}
+			});
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		NullableTypeStrategy that = (NullableTypeStrategy) o;
+		return includedArguments.equals(that.includedArguments) &&
+			initialStrategy.equals(that.initialStrategy);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(includedArguments, initialStrategy);
+	}
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java
index 15f0bb9..b67da1e 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java
@@ -418,8 +418,8 @@ public class InputTypeStrategiesTest extends InputTypeStrategiesTestBase {
 				.expectErrorMessage("Invalid number of arguments. At least 2 arguments expected but 1 passed."),
 
 			TestSpec.forStrategy(
-				"Array strategy infers a common type",
-				InputTypeStrategies.SPECIFIC_FOR_ARRAY)
+					"Array strategy infers a common type",
+					InputTypeStrategies.SPECIFIC_FOR_ARRAY)
 				.calledWithArgumentTypes(
 					DataTypes.INT().notNull(),
 					DataTypes.BIGINT().notNull(),
@@ -428,20 +428,20 @@ public class InputTypeStrategiesTest extends InputTypeStrategiesTestBase {
 				.expectArgumentTypes(DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE()),
 
 			TestSpec.forStrategy(
-				"Array strategy fails for no arguments",
-				InputTypeStrategies.SPECIFIC_FOR_ARRAY)
+					"Array strategy fails for no arguments",
+					InputTypeStrategies.SPECIFIC_FOR_ARRAY)
 				.calledWithArgumentTypes()
 				.expectErrorMessage("Invalid number of arguments. At least 1 arguments expected but 0 passed."),
 
 			TestSpec.forStrategy(
-				"Array strategy fails for null arguments",
-				InputTypeStrategies.SPECIFIC_FOR_ARRAY)
+					"Array strategy fails for null arguments",
+					InputTypeStrategies.SPECIFIC_FOR_ARRAY)
 				.calledWithArgumentTypes(DataTypes.NULL())
 				.expectErrorMessage("Invalid input arguments."),
 
 			TestSpec.forStrategy(
-				"Map strategy infers common types",
-				InputTypeStrategies.SPECIFIC_FOR_MAP)
+					"Map strategy infers common types",
+					InputTypeStrategies.SPECIFIC_FOR_MAP)
 				.calledWithArgumentTypes(
 					DataTypes.INT().notNull(),
 					DataTypes.DOUBLE(),
@@ -454,17 +454,32 @@ public class InputTypeStrategiesTest extends InputTypeStrategiesTestBase {
 					DataTypes.DOUBLE()),
 
 			TestSpec.forStrategy(
-				"Map strategy fails for no arguments",
-				InputTypeStrategies.SPECIFIC_FOR_MAP)
+					"Map strategy fails for no arguments",
+					InputTypeStrategies.SPECIFIC_FOR_MAP)
 				.calledWithArgumentTypes()
 				.expectErrorMessage("Invalid number of arguments. At least 2 arguments expected but 0 passed."),
 
 			TestSpec.forStrategy(
-				"Map strategy fails for an odd number of arguments",
-				InputTypeStrategies.SPECIFIC_FOR_MAP)
+					"Map strategy fails for an odd number of arguments",
+					InputTypeStrategies.SPECIFIC_FOR_MAP)
 				.calledWithArgumentTypes(DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT())
 				.expectErrorMessage("Invalid number of arguments. 3 arguments passed."),
 
+			TestSpec.forStrategy(
+					"Cast strategy",
+					InputTypeStrategies.SPECIFIC_FOR_CAST)
+				.calledWithArgumentTypes(DataTypes.INT(), DataTypes.BIGINT())
+				.calledWithLiteralAt(1, DataTypes.BIGINT())
+				.expectSignature("f(<ANY>, <TYPE LITERAL>)")
+				.expectArgumentTypes(DataTypes.INT(), DataTypes.BIGINT()),
+
+			TestSpec.forStrategy(
+					"Cast strategy for invalid target type",
+					InputTypeStrategies.SPECIFIC_FOR_CAST)
+				.calledWithArgumentTypes(DataTypes.BOOLEAN(), DataTypes.DATE())
+				.calledWithLiteralAt(1, DataTypes.DATE())
+				.expectErrorMessage("Unsupported cast from 'BOOLEAN' to 'DATE'."),
+
 			TestSpec
 				.forStrategy(
 					"Logical type roots instead of concrete data types",
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTestBase.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTestBase.java
index 846733d..ef8efe8 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTestBase.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTestBase.java
@@ -95,6 +95,10 @@ public abstract class InputTypeStrategiesTestBase {
 		callContextMock.argumentLiterals = IntStream.range(0, actualArgumentTypes.size())
 			.mapToObj(i -> testSpec.literalPos != null && i == testSpec.literalPos)
 			.collect(Collectors.toList());
+		callContextMock.argumentValues = IntStream.range(0, actualArgumentTypes.size())
+			.mapToObj(i -> (testSpec.literalPos != null && i == testSpec.literalPos) ?
+				Optional.ofNullable(testSpec.literalValue) : Optional.empty())
+			.collect(Collectors.toList());
 		callContextMock.argumentNulls = IntStream.range(0, actualArgumentTypes.size())
 			.mapToObj(i -> false)
 			.collect(Collectors.toList());
@@ -156,6 +160,8 @@ public abstract class InputTypeStrategiesTestBase {
 
 		private @Nullable Integer literalPos;
 
+		private @Nullable Object literalValue;
+
 		private @Nullable InputTypeStrategy surroundingStrategy;
 
 		private @Nullable String expectedSignature;
@@ -202,6 +208,12 @@ public abstract class InputTypeStrategiesTestBase {
 			return this;
 		}
 
+		TestSpec calledWithLiteralAt(int pos, Object value) {
+			this.literalPos = pos;
+			this.literalValue = value;
+			return this;
+		}
+
 		TestSpec expectSignature(String signature) {
 			this.expectedSignature = signature;
 			return this;
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/TypeStrategiesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/TypeStrategiesTest.java
index 102516c..b30ac43 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/TypeStrategiesTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/TypeStrategiesTest.java
@@ -113,37 +113,60 @@ public class TypeStrategiesTest {
 				.inputTypes()
 				.expectErrorMessage("Could not infer an output type for the given arguments. Untyped NULL received."),
 
-			TestSpec.forStrategy(
-				"Infer a row type",
-				TypeStrategies.ROW)
+			TestSpec
+				.forStrategy(
+					"Infer a row type",
+					TypeStrategies.ROW)
 				.inputTypes(DataTypes.BIGINT(), DataTypes.STRING())
 				.expectDataType(DataTypes.ROW(
 					DataTypes.FIELD("f0", DataTypes.BIGINT()),
 					DataTypes.FIELD("f1", DataTypes.STRING())).notNull()
 				),
 
-			TestSpec.forStrategy(
-				"Infer an array type",
-				TypeStrategies.ARRAY)
+			TestSpec
+				.forStrategy(
+					"Infer an array type",
+					TypeStrategies.ARRAY)
 				.inputTypes(DataTypes.BIGINT(), DataTypes.BIGINT())
 				.expectDataType(DataTypes.ARRAY(DataTypes.BIGINT()).notNull()),
 
-			TestSpec.forStrategy(
-				"Infer a map type",
-				TypeStrategies.MAP)
+			TestSpec.
+				forStrategy(
+					"Infer a map type",
+					TypeStrategies.MAP)
 				.inputTypes(DataTypes.BIGINT(), DataTypes.STRING().notNull())
 				.expectDataType(DataTypes.MAP(DataTypes.BIGINT(), DataTypes.STRING().notNull()).notNull()),
 
+			TestSpec
+				.forStrategy(
+					"Cascading to nullable type",
+					nullable(explicit(DataTypes.BOOLEAN().notNull())))
+				.inputTypes(DataTypes.BIGINT().notNull(), DataTypes.VARCHAR(2).nullable())
+				.expectDataType(DataTypes.BOOLEAN().nullable()),
+
+			TestSpec
+				.forStrategy(
+					"Cascading to not null type",
+					nullable(explicit(DataTypes.BOOLEAN().nullable())))
+				.inputTypes(DataTypes.BIGINT().notNull(), DataTypes.VARCHAR(2).notNull())
+				.expectDataType(DataTypes.BOOLEAN().notNull()),
+
 			TestSpec.forStrategy(
-				"Cascading to nullable type",
-				nullable(explicit(DataTypes.BOOLEAN().notNull())))
+					"Cascading to not null type but only consider first argument",
+					nullable(ConstantArgumentCount.to(0), explicit(DataTypes.BOOLEAN().nullable())))
+				.inputTypes(DataTypes.BIGINT().notNull(), DataTypes.VARCHAR(2).nullable())
+				.expectDataType(DataTypes.BOOLEAN().notNull()),
+
+			TestSpec.forStrategy(
+					"Cascading to null type but only consider first two argument",
+					nullable(ConstantArgumentCount.to(1), explicit(DataTypes.BOOLEAN().nullable())))
 				.inputTypes(DataTypes.BIGINT().notNull(), DataTypes.VARCHAR(2).nullable())
 				.expectDataType(DataTypes.BOOLEAN().nullable()),
 
 			TestSpec.forStrategy(
-				"Cascading to not null type",
-				nullable(explicit(DataTypes.BOOLEAN().nullable())))
-				.inputTypes(DataTypes.BIGINT().notNull(), DataTypes.VARCHAR(2).notNull())
+					"Cascading to not null type but only consider the second and third argument",
+					nullable(ConstantArgumentCount.between(1, 2), explicit(DataTypes.BOOLEAN().nullable())))
+				.inputTypes(DataTypes.BIGINT().nullable(), DataTypes.BIGINT().notNull(), DataTypes.VARCHAR(2).notNull())
 				.expectDataType(DataTypes.BOOLEAN().notNull())
 		);
 	}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/utils/CallContextMock.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/utils/CallContextMock.java
index 02aefe3..d4a2747 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/utils/CallContextMock.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/utils/CallContextMock.java
@@ -70,7 +70,7 @@ public class CallContextMock implements CallContext {
 	@Override
 	@SuppressWarnings("unchecked")
 	public <T> Optional<T> getArgumentValue(int pos, Class<T> clazz) {
-		return (Optional<T>) argumentValues.get(pos);
+		return (Optional<T>) argumentValues.get(pos).filter(v -> clazz.isAssignableFrom(v.getClass()));
 	}
 
 	@Override
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
index ea98b0d..f088b85 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
@@ -42,13 +42,13 @@ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExp
       definition, call.getChildren.asScala,
       () =>
         definition match {
-          case ROW | ARRAY | MAP => ApiResolvedCallExpression(call)
+          case ROW | ARRAY | MAP => ApiResolvedExpression(call.getOutputDataType)
           case _ =>
             if (definition.getKind == FunctionKind.AGGREGATE ||
               definition.getKind == FunctionKind.TABLE_AGGREGATE) {
               ApiResolvedAggregateCallExpression(call)
             } else {
-              ApiResolvedCallExpression(call)
+              ApiResolvedExpression(call.getOutputDataType)
             }
         })
   }
@@ -69,12 +69,6 @@ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExp
 
     // special case: requires individual handling of child expressions
     func match {
-      case CAST =>
-        assert(children.size == 2)
-        return Cast(
-          children.head.accept(this),
-          fromDataTypeToTypeInfo(
-            children(1).asInstanceOf[TypeLiteralExpression].getOutputDataType))
 
       case REINTERPRET_CAST =>
         assert(children.size == 3)
@@ -735,7 +729,7 @@ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExp
   }
 
   override def visit(typeLiteral: TypeLiteralExpression): PlannerExpression = {
-    throw new TableException("Unsupported type literal expression: " + typeLiteral)
+    ApiResolvedExpression(typeLiteral.getOutputDataType)
   }
 
   override def visit(tableRef: TableReferenceExpression): PlannerExpression = {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/aggregations.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/aggregations.scala
index ffb4822..7249c34 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/aggregations.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/aggregations.scala
@@ -37,7 +37,7 @@ abstract sealed class Aggregation extends PlannerExpression {
 
 /**
  * Wrapper for call expressions resolved already in the API with the new type inference stack.
- * Separate from [[ApiResolvedCallExpression]] because others' expressions validation logic
+ * Separate from [[ApiResolvedExpression]] because others' expressions validation logic
  * check for the [[Aggregation]] trait.
  */
 case class ApiResolvedAggregateCallExpression(
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/call.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/call.scala
index 0d10647..3e97f7b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/call.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/call.scala
@@ -25,22 +25,21 @@ import org.apache.flink.table.planner.validate.{ValidationFailure, ValidationRes
 import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType
 import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
 import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
+import org.apache.flink.table.types.DataType
 import org.apache.flink.table.types.logical.LogicalType
 import org.apache.flink.table.types.utils.TypeConversions
 import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
 
 /**
- * Wrapper for call expressions resolved already in the API with the new type inference stack.
+ * Wrapper for expressions that have been resolved already in the API with the new type inference
+ * stack.
  */
-case class ApiResolvedCallExpression(
-    resolvedCall: CallExpression)
+case class ApiResolvedExpression(resolvedDataType: DataType)
   extends LeafExpression {
 
-  override private[flink] def resultType: TypeInformation[_] = TypeConversions
-    .fromDataTypeToLegacyInfo(
-      resolvedCall
-        .getOutputDataType)
+  override private[flink] def resultType: TypeInformation[_] =
+    TypeConversions.fromDataTypeToLegacyInfo(resolvedDataType)
 }
 
 /**
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/cast.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/cast.scala
deleted file mode 100644
index 37e6717..0000000
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/cast.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.expressions
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.planner.typeutils.TypeCoercion
-import org.apache.flink.table.planner.validate._
-import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
-
-case class Cast(child: PlannerExpression, resultType: TypeInformation[_])
-  extends UnaryExpression {
-
-  override def toString = s"$child.cast($resultType)"
-
-  override private[flink] def makeCopy(anyRefs: Array[AnyRef]): this.type = {
-    val child: PlannerExpression = anyRefs.head.asInstanceOf[PlannerExpression]
-    copy(child, resultType).asInstanceOf[this.type]
-  }
-
-  override private[flink] def validateInput(): ValidationResult = {
-    if (TypeCoercion.canCast(
-      fromTypeInfoToLogicalType(child.resultType),
-      fromTypeInfoToLogicalType(resultType))) {
-      ValidationSuccess
-    } else {
-      ValidationFailure(s"Unsupported cast from '${child.resultType}' to '$resultType'")
-    }
-  }
-}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/time.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/time.scala
index 8817f5c..98ceb71 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/time.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/time.scala
@@ -27,8 +27,8 @@ import org.apache.flink.table.planner.typeutils.TypeInfoCheckUtils
 import org.apache.flink.table.planner.typeutils.TypeInfoCheckUtils.isTimeInterval
 import org.apache.flink.table.planner.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
-
 import org.apache.calcite.rex._
+import org.apache.flink.table.runtime.typeutils.LegacyLocalDateTimeTypeInfo
 
 case class Extract(timeIntervalUnit: PlannerExpression, temporal: PlannerExpression)
   extends PlannerExpression {
@@ -53,6 +53,7 @@ case class Extract(timeIntervalUnit: PlannerExpression, temporal: PlannerExpress
           || temporal.resultType == SqlTimeTypeInfo.TIMESTAMP
           || temporal.resultType == LocalTimeTypeInfo.LOCAL_DATE
           || temporal.resultType == LocalTimeTypeInfo.LOCAL_DATE_TIME
+          || temporal.resultType.isInstanceOf[LegacyLocalDateTimeTypeInfo]
           || temporal.resultType == TimeIntervalTypeInfo.INTERVAL_MILLIS
           || temporal.resultType == TimeIntervalTypeInfo.INTERVAL_MONTHS =>
         ValidationSuccess
@@ -64,6 +65,7 @@ case class Extract(timeIntervalUnit: PlannerExpression, temporal: PlannerExpress
           || temporal.resultType == SqlTimeTypeInfo.TIMESTAMP
           || temporal.resultType == LocalTimeTypeInfo.LOCAL_TIME
           || temporal.resultType == LocalTimeTypeInfo.LOCAL_DATE_TIME
+          || temporal.resultType.isInstanceOf[LegacyLocalDateTimeTypeInfo]
           || temporal.resultType == TimeIntervalTypeInfo.INTERVAL_MILLIS =>
         ValidationSuccess
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/DecimalTypeTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/DecimalTypeTest.scala
index be8220e..5ec5ad9 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/DecimalTypeTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/DecimalTypeTest.scala
@@ -18,50 +18,31 @@
 
 package org.apache.flink.table.planner.expressions
 
-import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.table.api._
-import org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral
 import org.apache.flink.table.planner.expressions.utils.ExpressionTestBase
-import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromLogicalTypeToTypeInfo
-import org.apache.flink.table.types.logical.DecimalType
+import org.apache.flink.table.types.DataType
 import org.apache.flink.types.Row
-
 import org.junit.{Ignore, Test}
 
 class DecimalTypeTest extends ExpressionTestBase {
 
-  private def DECIMAL = (p: Int, s: Int) => new DecimalType(p, s)
-
-  private def BOOL = DataTypes.BOOLEAN.getLogicalType
-
-  private def INT = DataTypes.INT.getLogicalType
-
-  private def LONG = DataTypes.BIGINT.getLogicalType
-
-  private def DOUBLE = DataTypes.DOUBLE.getLogicalType
-
-  private def STRING = DataTypes.STRING.getLogicalType
-
   @Test
   def testDecimalLiterals(): Unit = {
     // implicit double
     testAllApis(
       11.2,
       "11.2",
-      "11.2",
       "11.2")
 
     // implicit double
     testAllApis(
       0.7623533651719233,
       "0.7623533651719233",
-      "0.7623533651719233",
       "0.7623533651719233")
 
     // explicit decimal (with precision of 19)
     testAllApis(
       BigDecimal("1234567891234567891"),
-      "1234567891234567891p",
       "1234567891234567891",
       "1234567891234567891")
 
@@ -83,48 +64,40 @@ class DecimalTypeTest extends ExpressionTestBase {
     testAllApis(
       Double.MaxValue,
       Double.MaxValue.toString,
-      Double.MaxValue.toString,
       Double.MaxValue.toString)
 
     testAllApis(
       Double.MinValue,
       Double.MinValue.toString,
-      Double.MinValue.toString,
       Double.MinValue.toString)
 
     testAllApis(
       Double.MinValue.cast(DataTypes.FLOAT),
-      s"${Double.MinValue}.cast(FLOAT)",
       s"CAST(${Double.MinValue} AS FLOAT)",
       Float.NegativeInfinity.toString)
 
     testAllApis(
       Byte.MinValue.cast(DataTypes.TINYINT),
-      s"(${Byte.MinValue}).cast(BYTE)",
       s"CAST(${Byte.MinValue} AS TINYINT)",
       Byte.MinValue.toString)
 
     testAllApis(
       Byte.MinValue.cast(DataTypes.TINYINT) - 1.cast(DataTypes.TINYINT),
-      s"(${Byte.MinValue}).cast(BYTE) - (1).cast(BYTE)",
       s"CAST(${Byte.MinValue} AS TINYINT) - CAST(1 AS TINYINT)",
       Byte.MaxValue.toString)
 
     testAllApis(
       Short.MinValue.cast(DataTypes.SMALLINT),
-      s"(${Short.MinValue}).cast(SHORT)",
       s"CAST(${Short.MinValue} AS SMALLINT)",
       Short.MinValue.toString)
 
     testAllApis(
       Int.MinValue.cast(DataTypes.INT) - 1,
-      s"(${Int.MinValue}).cast(INT) - 1",
       s"CAST(${Int.MinValue} AS INT) - 1",
       Int.MaxValue.toString)
 
     testAllApis(
       Long.MinValue.cast(DataTypes.BIGINT()),
-      s"(${Long.MinValue}L).cast(LONG)",
       s"CAST(${Long.MinValue} AS BIGINT)",
       Long.MinValue.toString)
   }
@@ -135,13 +108,11 @@ class DecimalTypeTest extends ExpressionTestBase {
 //    // from String
     testTableApi(
       "123456789123456789123456789".cast(DataTypes.DECIMAL(38, 0)),
-      "'123456789123456789123456789'.cast(DECIMAL)",
       "123456789123456789123456789")
 
     // from double
     testAllApis(
       'f3.cast(DataTypes.DECIMAL(38, 0)),
-      "f3.cast(DECIMAL)",
       "CAST(f3 AS DECIMAL)",
       "4")
   }
@@ -156,38 +127,32 @@ class DecimalTypeTest extends ExpressionTestBase {
     // to double
     testAllApis(
       'f0.cast(DataTypes.DOUBLE),
-      "f0.cast(DOUBLE)",
       "CAST(f0 AS DOUBLE)",
       "1.2345678912345679E8")
 
     // to int
     testAllApis(
       'f4.cast(DataTypes.INT),
-      "f4.cast(INT)",
       "CAST(f4 AS INT)",
       "123456789")
 
     // to long
     testAllApis(
       'f4.cast(DataTypes.BIGINT()),
-      "f4.cast(LONG)",
       "CAST(f4 AS BIGINT)",
       "123456789")
 
     // to boolean (not SQL compliant)
     testTableApi(
       'f1.cast(DataTypes.BOOLEAN),
-      "f1.cast(BOOLEAN)",
       "true")
 
     testTableApi(
       'f5.cast(DataTypes.BOOLEAN),
-      "f5.cast(BOOLEAN)",
       "false")
 
     testTableApi(
       BigDecimal("123456789.123456789123456789").cast(DataTypes.DOUBLE),
-      "(123456789.123456789123456789p).cast(DOUBLE)",
       "1.2345678912345679E8")
 
     // testing padding behaviour
@@ -208,39 +173,33 @@ class DecimalTypeTest extends ExpressionTestBase {
     testAllApis(
       'f1 + 12,
       "f1 + 12",
-      "f1 + 12",
       "123456789123456789123456801")
 
     // implicit cast to decimal
     testAllApis(
-      valueLiteral(12) + 'f1,
-      "12 + f1",
+      lit(12) + 'f1,
       "12 + f1",
       "123456789123456789123456801")
 
     testAllApis(
       'f1 + BigDecimal("12.3"),
-      "f1 + 12.3p",
       "f1 + 12.3",
       "123456789123456789123456801.3"
     )
 
     testAllApis(
-      valueLiteral(BigDecimal("12.3").bigDecimal) + 'f1,
-      "12.3p + f1",
+      lit(BigDecimal("12.3").bigDecimal) + 'f1,
       "12.3 + f1",
       "123456789123456789123456801.3")
 
     testAllApis(
       'f1 + 'f1,
       "f1 + f1",
-      "f1 + f1",
       "246913578246913578246913578")
 
     testAllApis(
       'f1 - 'f1,
       "f1 - f1",
-      "f1 - f1",
       "0")
 
     // exceeds max precision 38.
@@ -252,20 +211,17 @@ class DecimalTypeTest extends ExpressionTestBase {
     testAllApis(
       'f1 / 'f1,
       "f1 / f1",
-      "f1 / f1",
       "1.00000000")
     // Decimal(30,0) / Decimal(30, 0) => Decimal(61,31) => Decimal(38,8)
 
     testAllApis(
       'f1 % 'f1,
-      "f1 % f1",
       "MOD(f1, f1)",
       "0")
 
     testAllApis(
       -'f0,
       "-f0",
-      "-f0",
       "-123456789.123456789123456789")
   }
 
@@ -274,49 +230,41 @@ class DecimalTypeTest extends ExpressionTestBase {
     testAllApis(
       'f1 < 12,
       "f1 < 12",
-      "f1 < 12",
       "false")
 
     testAllApis(
       'f1 > 12,
       "f1 > 12",
-      "f1 > 12",
       "true")
 
     testAllApis(
       'f1 === 12,
-      "f1 === 12",
       "f1 = 12",
       "false")
 
     testAllApis(
       'f5 === 0,
-      "f5 === 0",
       "f5 = 0",
       "true")
 
     testAllApis(
       'f1 === BigDecimal("123456789123456789123456789"),
-      "f1 === 123456789123456789123456789p",
       "f1 = CAST('123456789123456789123456789' AS DECIMAL(30, 0))",
       "true")
 
     testAllApis(
       'f1 !== BigDecimal("123456789123456789123456789"),
-      "f1 !== 123456789123456789123456789p",
       "f1 <> CAST('123456789123456789123456789' AS DECIMAL(30, 0))",
       "false")
 
     testAllApis(
       'f4 < 'f0,
       "f4 < f0",
-      "f4 < f0",
       "true")
 
     testAllApis(
       12.toExpr < 'f1,
       "12 < f1",
-      "12 < f1",
       "true")
 
     testAllApis(
@@ -328,94 +276,80 @@ class DecimalTypeTest extends ExpressionTestBase {
     testAllApis(
       12.toExpr - 'f37,
       "12 - f37",
-      "12 - f37",
       "10")
 
     testAllApis(
       12.toExpr + 'f37,
       "12 + f37",
-      "12 + f37",
       "14")
 
     testAllApis(
       12.toExpr * 'f37,
       "12 * f37",
-      "12 * f37",
       "24")
 
     testAllApis(
       12.toExpr / 'f37,
       "12 / f37",
-      "12 / f37",
       "6")
   }
 
   @Test
-  def testFieldAcess(): Unit = {
+  def testFieldAccess(): Unit = {
 
     // the most basic case
     testAllApis(
       'f6,
       "f6",
-      "f6",
       "123")
 
     testAllApis(
       'f7,
       "f7",
-      "f7",
       "123.45")
 
     // data from source are rounded to their declared scale before entering next step
     testAllApis(
       'f8,
       "f8",
-      "f8",
       "100.00")
 
     testAllApis(
       'f8 + 'f8,
       "f8 + f8",
-      "f8 + f8",
       "200.00")
 
     // trailing zeros are padded to the scale
     testAllApis(
       'f9,
       "f9",
-      "f9",
       "100.10")
 
     testAllApis(
       'f9 + 'f9,
       "f9 + f9",
-      "f9 + f9",
       "200.20")
 
     // source data is within precision after rounding
     testAllApis(
       'f10,
       "f10",
-      "f10",
       "100.00")
 
     testAllApis(
       'f10 + 'f10,
       "f10 + f10",
-      "f10 + f10",
       "200.00")
 
     // source data overflows over precision (after rounding)
     testAllApis(
       'f11,
       "f11",
-      "f11",
       "null")
 
     testAllApis(
       'f12,
       "f12",
-      "f12",
       "null")
   }
 
@@ -425,7 +359,6 @@ class DecimalTypeTest extends ExpressionTestBase {
     testAllApis(
       + 'f6,
       "+f6",
-      "+f6",
       "123")
 
     testAllApis(
@@ -437,7 +370,6 @@ class DecimalTypeTest extends ExpressionTestBase {
     testAllApis(
       - (( + 'f6) - ( - 'f7)),
       "- (( + f6) - ( - f7))",
-      "- (( + f6) - ( - f7))",
       "-246.45")
   }
 
@@ -450,13 +382,11 @@ class DecimalTypeTest extends ExpressionTestBase {
     testAllApis(
       'f13 + 'f14,
       "f13 + f14",
-      "f13 + f14",
       "300.2434")
 
     testAllApis(
       'f13 - 'f14,
       "f13 - f14",
-      "f13 - f14",
       "-100.0034")
 
     // INT => DECIMAL(10,0)
@@ -464,25 +394,21 @@ class DecimalTypeTest extends ExpressionTestBase {
     testAllApis(
       'f7 + 'f2,
       "f7 + f2",
-      "f7 + f2",
       "165.45")
 
     testAllApis(
       'f2 + 'f7,
       "f2 + f7",
-      "f2 + f7",
       "165.45")
 
     testAllApis(
       'f7 + 'f3,
       "f7 + f3",
-      "f7 + f3",
       "127.65")
 
     testAllApis(
       'f3 + 'f7,
       "f3 + f7",
-      "f3 + f7",
       "127.65")
 
     // our result type precision is capped at 38
@@ -494,40 +420,34 @@ class DecimalTypeTest extends ExpressionTestBase {
     testAllApis(
       'f15 + 'f16,
       "f15 + f16",
-      "f15 + f16",
       "300.0246913578012345678901234567")
 
     testAllApis(
       'f15 - 'f16,
       "f15 - f16",
-      "f15 - f16",
       "-100.0000000000012345678901234567")
 
     // 10 digits integral part
     testAllApis(
       'f17 + 'f18,
       "f17 + f18",
-      "f17 + f18",
       "null")
 
     testAllApis(
       'f17 - 'f18,
       "f17 - f18",
-      "f17 - f18",
       "null")
 
     // requires 39 digits
     testAllApis(
       'f19 + 'f19,
       "f19 + f19",
-      "f19 + f19",
       "null")
 
     // overflows in subexpression
     testAllApis(
       'f19 + 'f19 - 'f19,
       "f19 + f19 - f19",
-      "f19 + f19 - f19",
       "null")
   }
 
@@ -541,13 +461,11 @@ class DecimalTypeTest extends ExpressionTestBase {
     testAllApis(
       'f20 * 'f20,
       "f20 * f20",
-      "f20 * f20",
       "1.0000")
 
     testAllApis(
       'f20 * 'f21,
       "f20 * f21",
-      "f20 * f21",
       "2.000000")
 
     // INT => DECIMAL(10,0)
@@ -555,25 +473,21 @@ class DecimalTypeTest extends ExpressionTestBase {
     testAllApis(
       'f20 * 'f22,
       "f20 * f22",
-      "f20 * f22",
       "200.00")
 
     testAllApis(
       'f22 * 'f20,
       "f22 * f20",
-      "f22 * f20",
       "200.00")
 
     testAllApis(
       'f20 * 'f23,
       "f20 * f23",
-      "f20 * f23",
       "3.14")
 
     testAllApis(
       'f23 * 'f20,
       "f23 * f20",
-      "f23 * f20",
       "3.14")
 
     // precision is capped at 38; scale will not be reduced (unless over 38)
@@ -581,19 +495,16 @@ class DecimalTypeTest extends ExpressionTestBase {
     testAllApis(
       'f24 * 'f24,
       "f24 * f24",
-      "f24 * f24",
       "1.000000000000")
 
     testAllApis(
       'f24 * 'f25,
       "f24 * f25",
-      "f24 * f25",
       "2.0000000000000000")
 
     testAllApis(
       'f26 * 'f26,
       "f26 * f26",
-      "f26 * f26",
       "0.00010000000000000000000000000000000000"
     )
 
@@ -605,7 +516,6 @@ class DecimalTypeTest extends ExpressionTestBase {
     testAllApis(
       'f27 * 'f28,
       "f27 * f28",
-      "f27 * f28",
       "0.00000060000000000000"
     )
 
@@ -613,7 +523,6 @@ class DecimalTypeTest extends ExpressionTestBase {
     testAllApis(
       'f29 * 'f29,
       "f29 * f29",
-      "f29 * f29",
       "null"
     )
 
@@ -621,7 +530,6 @@ class DecimalTypeTest extends ExpressionTestBase {
     testAllApis(
       'f30 * 'f30,
       "f30 * f30",
-      "f30 * f30",
       "null"
     )
   }
@@ -634,13 +542,11 @@ class DecimalTypeTest extends ExpressionTestBase {
     testAllApis(
       'f31 / 'f32,
       "f31 / f32",
-      "f31 / f32",
       "0.333333")
 
     testAllApis(
       'f31 / 'f33,
       "f31 / f33",
-      "f31 / f33",
       "0.3333333")
 
     testAllApis(
@@ -652,7 +558,6 @@ class DecimalTypeTest extends ExpressionTestBase {
     testAllApis(
       'f31 / 'f35,
       "f31 / f35",
-      "f31 / f35",
       "0.333333")
 
     // INT => DECIMAL(10,0)
@@ -660,34 +565,29 @@ class DecimalTypeTest extends ExpressionTestBase {
     testAllApis(
       'f36 / 'f37,
       "f36 / f37",
-      "f36 / f37",
       "0.5000000000000")
 
 
     testAllApis(
       'f37 / 'f36,
       "f37 / f36",
-      "f37 / f36",
       "2.00000000000")
 
 
     testAllApis(
       'f36 / 'f38,
       "f36 / f38",
-      "f36 / f38",
       (1.0/3.0).toString)
 
     testAllApis(
       'f38 / 'f36,
       "f38 / f36",
-      "f38 / f36",
       (3.0/1.0).toString)
 
     // result overflow, because result type integral part is reduced
     testAllApis(
       'f39 / 'f40,
       "f39 / f40",
-      "f39 / f40",
       "null")
   }
 
@@ -696,50 +596,42 @@ class DecimalTypeTest extends ExpressionTestBase {
     // MOD(Exact1, Exact2) => Exact2
     testAllApis(
       'f41 % 'f42,
-      "f41 % f42",
       "mod(f41, f42)",
       "3.0000")
 
     testAllApis(
       'f42 % 'f41,
-      "f42 % f41",
       "mod(f42, f41)",
       "2.0000")
 
     testAllApis(
       'f41 % 'f43,
-      "f41 % f43",
       "mod(f41, f43)",
       "3.00")
 
     testAllApis(
       'f43 % 'f41,
-      "f43 % f41",
       "mod(f43, f41)",
       "1.00")
 
     // signs. consistent with Java's % operator.
     testAllApis(
       'f44 % 'f45,
-      "f44 % f45",
       "mod(f44, f45)",
       (3%5).toString)
 
     testAllApis(
       -'f44 % 'f45,
-      "-f44 % f45",
       "mod(-f44, f45)",
       ((-3)%5).toString)
 
     testAllApis(
       'f44 % -'f45,
-      "f44 % -f45",
       "mod(f44, -f45)",
       (3%(-5)).toString)
 
     testAllApis(
       -'f44 % -'f45,
-      "-f44 % -f45",
       "mod(-f44, -f45)",
       ((-3)%(-5)).toString)
 
@@ -747,7 +639,6 @@ class DecimalTypeTest extends ExpressionTestBase {
     // (In T-SQL, s2 is expanded to s1, so that there's no rounding.)
     testAllApis(
       'f46 % 'f47,
-      "f46 % f47",
       "mod(f46, f47)",
       "3.1234")
   }
@@ -757,34 +648,29 @@ class DecimalTypeTest extends ExpressionTestBase {
 
     testAllApis(
       ifThenElse('f48 > 'f49, 'f48, 'f49),
-      "ifThenElse(greaterThan(f48, f49), f48, f49)",
       "if(f48 > f49, f48, f49)",
       "3.14")
 
     testAllApis(
       'f48.abs(),
-      "f48.abs()",
       "abs(f48)",
       "3.14"
     )
 
     testAllApis(
       (-'f48).abs(),
-      "(-f48).abs()",
       "abs(-f48)",
       "3.14"
     )
 
     testAllApis(
       'f48.floor(),
-      "f48.floor()",
       "floor(f48)",
       "3"
     )
 
     testAllApis(
       'f48.ceil(),
-      "f48.ceil()",
       "ceil(f48)",
       "4"
     )
@@ -792,20 +678,17 @@ class DecimalTypeTest extends ExpressionTestBase {
     // calcite: SIGN(Decimal(p,s))=>Decimal(p,s)
     testAllApis(
       'f48.sign(),
-      "f48.sign()",
       "sign(f48)",
       "1.00"
     )
 
     testAllApis(
       (-'f48).sign(),
-      "(-f48).sign()",
       "sign(-f48)",
       "-1.00"
     )
     testAllApis(
       ('f48 - 'f48).sign(),
-      "(f48 - f48).sign()",
       "sign(f48 - f48)",
       "0.00"
     )
@@ -813,19 +696,16 @@ class DecimalTypeTest extends ExpressionTestBase {
     // ROUND(Decimal(p,s)[,INT])
     testAllApis(
       'f50.round(0),
-      "f50.round(0)",
       "round(f50)",
       "647")
 
     testAllApis(
       'f50.round(0),
-      "f50.round(0)",
       "round(f50,0)",
       "647")
 
     testAllApis(
       'f50.round(1),
-      "f50.round(1)",
       "round(f50,1)",
       "646.6")
 
@@ -837,67 +717,56 @@ class DecimalTypeTest extends ExpressionTestBase {
 
     testAllApis(
       'f50.round(3),
-      "f50.round(3)",
       "round(f50,3)",
       "646.646")
 
     testAllApis(
       'f50.round(4),
-      "f50.round(4)",
       "round(f50,4)",
       "646.646")
 
     testAllApis(
       'f50.round(-1),
-      "f50.round(-1)",
       "round(f50,-1)",
       "650")
 
     testAllApis(
       'f50.round(-2),
-      "f50.round(-2)",
       "round(f50,-2)",
       "600")
 
     testAllApis(
       'f50.round(-3),
-      "f50.round(-3)",
       "round(f50,-3)",
       "1000")
 
     testAllApis(
       'f50.round(-4),
-      "f50.round(-4)",
       "round(f50,-4)",
       "0")
 
     testAllApis(
       'f51.round(1),
-      "f51.round(1)",
       "round(f51,1)",
       "100.0")
 
     testAllApis(
       (-'f51).round(1),
-      "(-f51).round(1)",
       "round(-f51,1)",
       "-100.0")
 
     testAllApis(
       ('f51).round(-1),
-      "(f51).round(-1)",
       "round(f51,-1)",
       "100")
 
     testAllApis(
       (-'f51).round(-1),
-      "(-f51).round(-1)",
       "round(-f51,-1)",
       "-100")
 
     testAllApis(
       ('f52).round(-1),
-      "(f52).round(-1)",
       "round(f52,-1)",
       "null")
   }
@@ -1236,8 +1105,6 @@ class DecimalTypeTest extends ExpressionTestBase {
     testData.setField(3, 4.2)
     testData.setField(4, BigDecimal("123456789").bigDecimal)
     testData.setField(5, BigDecimal("0.000").bigDecimal)
-
-    //convert ITCase to unit Test
     testData.setField(6, BigDecimal("123").bigDecimal)
     testData.setField(7, BigDecimal("123.45").bigDecimal)
     testData.setField(8, BigDecimal("100.004").bigDecimal)
@@ -1299,93 +1166,87 @@ class DecimalTypeTest extends ExpressionTestBase {
     testData.setField(64, BigDecimal("1").bigDecimal)
     testData.setField(65, 1)
     testData.setField(66, 1.0)
-
     testData.setField(67, BigDecimal("1").bigDecimal)
     testData.setField(68, BigDecimal("99").bigDecimal)
     testData.setField(69, 99)
     testData.setField(70, 99.0)
 
-
-
     testData
   }
 
-  override def typeInfo: RowTypeInfo = {
-    new RowTypeInfo(
-      /* 0 */ fromLogicalTypeToTypeInfo(DECIMAL(30, 18)),
-      /* 1 */ fromLogicalTypeToTypeInfo(DECIMAL(30, 0)),
-      /* 2 */ Types.INT(),
-      /* 3 */ Types.DOUBLE(),
-      /* 4 */ fromLogicalTypeToTypeInfo(DECIMAL(10, 0)),
-      /* 5 */ fromLogicalTypeToTypeInfo(DECIMAL(10, 3)),
-
-      //convert ITCase to unit Test
-      /* 6 */ fromLogicalTypeToTypeInfo(DECIMAL(10, 0)),
-      /* 7 */ fromLogicalTypeToTypeInfo(DECIMAL(7, 2)),
-      /* 8 */ fromLogicalTypeToTypeInfo(DECIMAL(7, 2)),
-      /* 9 */ fromLogicalTypeToTypeInfo(DECIMAL(7, 2)),
-      /* 10 */ fromLogicalTypeToTypeInfo(DECIMAL(5, 2)),
-      /* 11 */ fromLogicalTypeToTypeInfo(DECIMAL(2, 0)),
-      /* 12 */ fromLogicalTypeToTypeInfo(DECIMAL(4, 2)),
-      /* 13 */ fromLogicalTypeToTypeInfo(DECIMAL(10, 2)),
-      /* 14 */ fromLogicalTypeToTypeInfo(DECIMAL(10, 4)),
-      /* 15 */ fromLogicalTypeToTypeInfo(DECIMAL(38, 10)),
-      /* 16 */ fromLogicalTypeToTypeInfo(DECIMAL(38, 28)),
-      /* 17 */ fromLogicalTypeToTypeInfo(DECIMAL(38, 10)),
-      /* 18 */ fromLogicalTypeToTypeInfo(DECIMAL(38, 28)),
-      /* 19 */ fromLogicalTypeToTypeInfo(DECIMAL(38, 0)),
-      /* 20 */ fromLogicalTypeToTypeInfo(DECIMAL(5, 2)),
-      /* 21 */ fromLogicalTypeToTypeInfo(DECIMAL(10, 4)),
-      /* 22 */ Types.INT(),
-      /* 23 */ Types.DOUBLE(),
-      /* 24 */ fromLogicalTypeToTypeInfo(DECIMAL(30, 6)),
-      /* 25 */ fromLogicalTypeToTypeInfo(DECIMAL(30, 10)),
-      /* 26 */ fromLogicalTypeToTypeInfo(DECIMAL(30, 20)),
-      /* 27 */ fromLogicalTypeToTypeInfo(DECIMAL(38, 10)),
-      /* 28 */ fromLogicalTypeToTypeInfo(DECIMAL(38, 10)),
-      /* 29 */ fromLogicalTypeToTypeInfo(DECIMAL(38, 0)),
-      /* 30 */ fromLogicalTypeToTypeInfo(DECIMAL(30, 20)),
-      /* 31 */ fromLogicalTypeToTypeInfo(DECIMAL(20, 2)),
-      /* 32 */ fromLogicalTypeToTypeInfo(DECIMAL(2, 1)),
-      /* 33 */ fromLogicalTypeToTypeInfo(DECIMAL(4, 3)),
-      /* 34 */ fromLogicalTypeToTypeInfo(DECIMAL(20, 10)),
-      /* 35 */ fromLogicalTypeToTypeInfo(DECIMAL(20, 16)),
-      /* 36 */ fromLogicalTypeToTypeInfo(DECIMAL(10, 2)),
-      /* 37 */ Types.INT(),
-      /* 38 */ Types.DOUBLE(),
-      /* 39 */ fromLogicalTypeToTypeInfo(DECIMAL(30, 0)),
-      /* 40 */ fromLogicalTypeToTypeInfo(DECIMAL(30, 20)),
-      /* 41 */ fromLogicalTypeToTypeInfo(DECIMAL(10, 2)),
-      /* 42 */ fromLogicalTypeToTypeInfo(DECIMAL(10, 4)),
-      /* 43 */ Types.INT(),
-      /* 44 */ fromLogicalTypeToTypeInfo(DECIMAL(1, 0)),
-      /* 45 */ fromLogicalTypeToTypeInfo(DECIMAL(1, 0)),
-      /* 46 */ fromLogicalTypeToTypeInfo(DECIMAL(10, 4)),
-      /* 47 */ fromLogicalTypeToTypeInfo(DECIMAL(10, 2)),
-      /* 48 */ fromLogicalTypeToTypeInfo(DECIMAL(10, 2)),
-      /* 49 */ fromLogicalTypeToTypeInfo(DECIMAL(10, 2)),
-      /* 50 */ fromLogicalTypeToTypeInfo(DECIMAL(10, 3)),
-      /* 51 */ fromLogicalTypeToTypeInfo(DECIMAL(4, 2)),
-      /* 52 */ fromLogicalTypeToTypeInfo(DECIMAL(38, 0)),
-      /* 53 */ fromLogicalTypeToTypeInfo(DECIMAL(8, 4)),
-      /* 54 */ fromLogicalTypeToTypeInfo(DECIMAL(10, 2)),
-      /* 55 */ Types.STRING(),
-      /* 56 */ fromLogicalTypeToTypeInfo(DECIMAL(8, 2)),
-      /* 57 */ Types.DOUBLE(),
-      /* 58 */ Types.STRING(),
-      /* 59 */ Types.STRING(),
-      /* 60 */ fromLogicalTypeToTypeInfo(DECIMAL(4, 2)),
-      /* 61 */ Types.STRING(),
-      /* 62 */ Types.INT(),
-      /* 63 */ fromLogicalTypeToTypeInfo(DECIMAL(8, 2)),
-      /* 64 */ fromLogicalTypeToTypeInfo(DECIMAL(8, 4)),
-      /* 65 */ Types.INT(),
-      /* 66 */ Types.DOUBLE(),
-
-      /* 67 */ fromLogicalTypeToTypeInfo(DECIMAL(1, 0)),
-      /* 68 */ fromLogicalTypeToTypeInfo(DECIMAL(2, 0)),
-      /* 69 */ Types.INT(),
-      /* 70 */ Types.DOUBLE()
-    )
-  }
+  override def testDataType: DataType = DataTypes.ROW(
+    DataTypes.FIELD("f0", DataTypes.DECIMAL(30, 18)),
+    DataTypes.FIELD("f1", DataTypes.DECIMAL(30, 0)),
+    DataTypes.FIELD("f2", DataTypes.INT()),
+    DataTypes.FIELD("f3", DataTypes.DOUBLE()),
+    DataTypes.FIELD("f4", DataTypes.DECIMAL(10, 0)),
+    DataTypes.FIELD("f5", DataTypes.DECIMAL(10, 3)),
+    DataTypes.FIELD("f6", DataTypes.DECIMAL(10, 0)),
+    DataTypes.FIELD("f7", DataTypes.DECIMAL(7, 2)),
+    DataTypes.FIELD("f8", DataTypes.DECIMAL(7, 2)),
+    DataTypes.FIELD("f9", DataTypes.DECIMAL(7, 2)),
+    DataTypes.FIELD("f10", DataTypes.DECIMAL(5, 2)),
+    DataTypes.FIELD("f11", DataTypes.DECIMAL(2, 0)),
+    DataTypes.FIELD("f12", DataTypes.DECIMAL(4, 2)),
+    DataTypes.FIELD("f13", DataTypes.DECIMAL(10, 2)),
+    DataTypes.FIELD("f14", DataTypes.DECIMAL(10, 4)),
+    DataTypes.FIELD("f15", DataTypes.DECIMAL(38, 10)),
+    DataTypes.FIELD("f16", DataTypes.DECIMAL(38, 28)),
+    DataTypes.FIELD("f17", DataTypes.DECIMAL(38, 10)),
+    DataTypes.FIELD("f18", DataTypes.DECIMAL(38, 28)),
+    DataTypes.FIELD("f19", DataTypes.DECIMAL(38, 0)),
+    DataTypes.FIELD("f20", DataTypes.DECIMAL(5, 2)),
+    DataTypes.FIELD("f21", DataTypes.DECIMAL(10, 4)),
+    DataTypes.FIELD("f22", DataTypes.INT()),
+    DataTypes.FIELD("f23", DataTypes.DOUBLE()),
+    DataTypes.FIELD("f24", DataTypes.DECIMAL(30, 6)),
+    DataTypes.FIELD("f25", DataTypes.DECIMAL(30, 10)),
+    DataTypes.FIELD("f26", DataTypes.DECIMAL(30, 20)),
+    DataTypes.FIELD("f27", DataTypes.DECIMAL(38, 10)),
+    DataTypes.FIELD("f28", DataTypes.DECIMAL(38, 10)),
+    DataTypes.FIELD("f29", DataTypes.DECIMAL(38, 0)),
+    DataTypes.FIELD("f30", DataTypes.DECIMAL(30, 20)),
+    DataTypes.FIELD("f31", DataTypes.DECIMAL(20, 2)),
+    DataTypes.FIELD("f32", DataTypes.DECIMAL(2, 1)),
+    DataTypes.FIELD("f33", DataTypes.DECIMAL(4, 3)),
+    DataTypes.FIELD("f34", DataTypes.DECIMAL(20, 10)),
+    DataTypes.FIELD("f35", DataTypes.DECIMAL(20, 16)),
+    DataTypes.FIELD("f36", DataTypes.DECIMAL(10, 2)),
+    DataTypes.FIELD("f37", DataTypes.INT()),
+    DataTypes.FIELD("f38", DataTypes.DOUBLE()),
+    DataTypes.FIELD("f39", DataTypes.DECIMAL(30, 0)),
+    DataTypes.FIELD("f40", DataTypes.DECIMAL(30, 20)),
+    DataTypes.FIELD("f41", DataTypes.DECIMAL(10, 2)),
+    DataTypes.FIELD("f42", DataTypes.DECIMAL(10, 4)),
+    DataTypes.FIELD("f43", DataTypes.INT()),
+    DataTypes.FIELD("f44", DataTypes.DECIMAL(1, 0)),
+    DataTypes.FIELD("f45", DataTypes.DECIMAL(1, 0)),
+    DataTypes.FIELD("f46", DataTypes.DECIMAL(10, 4)),
+    DataTypes.FIELD("f47", DataTypes.DECIMAL(10, 2)),
+    DataTypes.FIELD("f48", DataTypes.DECIMAL(10, 2)),
+    DataTypes.FIELD("f49", DataTypes.DECIMAL(10, 2)),
+    DataTypes.FIELD("f50", DataTypes.DECIMAL(10, 3)),
+    DataTypes.FIELD("f51", DataTypes.DECIMAL(4, 2)),
+    DataTypes.FIELD("f52", DataTypes.DECIMAL(38, 0)),
+    DataTypes.FIELD("f53", DataTypes.DECIMAL(8, 4)),
+    DataTypes.FIELD("f54", DataTypes.DECIMAL(10, 2)),
+    DataTypes.FIELD("f55", DataTypes.STRING()),
+    DataTypes.FIELD("f56", DataTypes.DECIMAL(8, 2)),
+    DataTypes.FIELD("f57", DataTypes.DOUBLE()),
+    DataTypes.FIELD("f58", DataTypes.STRING()),
+    DataTypes.FIELD("f59", DataTypes.STRING()),
+    DataTypes.FIELD("f60", DataTypes.DECIMAL(4, 2)),
+    DataTypes.FIELD("f61", DataTypes.STRING()),
+    DataTypes.FIELD("f62", DataTypes.INT()),
+    DataTypes.FIELD("f63", DataTypes.DECIMAL(8, 2)),
+    DataTypes.FIELD("f64", DataTypes.DECIMAL(8, 4)),
+    DataTypes.FIELD("f65", DataTypes.INT()),
+    DataTypes.FIELD("f66", DataTypes.DOUBLE()),
+    DataTypes.FIELD("f67", DataTypes.DECIMAL(1, 0)),
+    DataTypes.FIELD("f68", DataTypes.DECIMAL(2, 0)),
+    DataTypes.FIELD("f69", DataTypes.INT()),
+    DataTypes.FIELD("f70", DataTypes.DOUBLE())
+  )
+
+  override def containsLegacyTypes: Boolean = false
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/RowTypeTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/RowTypeTest.scala
index 8935394..b78f6cb 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/RowTypeTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/RowTypeTest.scala
@@ -133,8 +133,6 @@ class RowTypeTest extends RowTypeTestBase {
   @Test
   def testUnsupportedCastTableApi(): Unit = {
     expectedException.expect(classOf[ValidationException])
-    expectedException.expectMessage(
-      "Unsupported cast from 'Row(f0: String, f1: Boolean)' to 'Long'")
 
     testTableApi(
       'f5.cast(DataTypes.BIGINT()),
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
index 8040704..f09aee1 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
@@ -18,65 +18,55 @@
 
 package org.apache.flink.table.planner.expressions
 
-import org.apache.flink.api.common.typeinfo.Types
-import org.apache.flink.api.java.typeutils.RowTypeInfo
+import java.sql.Timestamp
+import java.text.SimpleDateFormat
+import java.time.{Instant, ZoneId, ZoneOffset}
+import java.util.{Locale, TimeZone}
+
 import org.apache.flink.table.api._
 import org.apache.flink.table.expressions.TimeIntervalUnit
 import org.apache.flink.table.planner.expressions.utils.ExpressionTestBase
 import org.apache.flink.table.planner.utils.DateTimeTestUtil
 import org.apache.flink.table.planner.utils.DateTimeTestUtil._
-import org.apache.flink.table.runtime.typeutils.{LegacyInstantTypeInfo, LegacyLocalDateTimeTypeInfo}
-import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
+import org.apache.flink.table.planner.{JInt, JLong}
+import org.apache.flink.table.types.DataType
 import org.apache.flink.types.Row
-
 import org.junit.Test
 
-import java.sql.Timestamp
-import java.text.SimpleDateFormat
-import java.time.{Instant, ZoneId, ZoneOffset}
-import java.util.{Locale, TimeZone}
-
 class TemporalTypesTest extends ExpressionTestBase {
 
   @Test
   def testTimePointLiterals(): Unit = {
     testAllApis(
       "1990-10-14".toDate,
-      "'1990-10-14'.toDate",
       "DATE '1990-10-14'",
       "1990-10-14")
 
     testTableApi(
       localDate2Literal(localDate("2040-09-11")),
-      "'2040-09-11'.toDate",
       "2040-09-11")
 
     testAllApis(
       "1500-04-30".cast(DataTypes.DATE),
-      "'1500-04-30'.cast(SQL_DATE)",
       "CAST('1500-04-30' AS DATE)",
       "1500-04-30")
 
     testAllApis(
       "15:45:59".toTime,
-      "'15:45:59'.toTime",
       "TIME '15:45:59'",
       "15:45:59")
 
     testTableApi(
       localTime2Literal(DateTimeTestUtil.localTime("00:00:00")),
-      "'00:00:00'.toTime",
       "00:00:00")
 
     testAllApis(
       "1:30:00".cast(DataTypes.TIME),
-      "'1:30:00'.cast(SQL_TIME)",
       "CAST('1:30:00' AS TIME)",
       "01:30:00")
 
     testAllApis(
       "1990-10-14 23:00:00.123".toTimestamp,
-      "'1990-10-14 23:00:00.123'.toTimestamp",
       "TIMESTAMP '1990-10-14 23:00:00.123'",
       "1990-10-14 23:00:00.123")
 
@@ -86,7 +76,6 @@ class TemporalTypesTest extends ExpressionTestBase {
 
     testAllApis(
       "1500-04-30 12:00:00".cast(DataTypes.TIMESTAMP(3)),
-      "'1500-04-30 12:00:00'.cast(SQL_TIMESTAMP)",
       "CAST('1500-04-30 12:00:00' AS TIMESTAMP(3))",
       "1500-04-30 12:00:00.000")
 
@@ -128,43 +117,36 @@ class TemporalTypesTest extends ExpressionTestBase {
   def testTimeIntervalLiterals(): Unit = {
     testAllApis(
       1.year,
-      "1.year",
       "INTERVAL '1' YEAR",
       "+1-00")
 
     testAllApis(
       1.month,
-      "1.month",
       "INTERVAL '1' MONTH",
       "+0-01")
 
     testAllApis(
       12.days,
-      "12.days",
       "INTERVAL '12' DAY",
       "+12 00:00:00.000")
 
     testAllApis(
       1.hour,
-      "1.hour",
       "INTERVAL '1' HOUR",
       "+0 01:00:00.000")
 
     testAllApis(
       3.minutes,
-      "3.minutes",
       "INTERVAL '3' MINUTE",
       "+0 00:03:00.000")
 
     testAllApis(
       3.seconds,
-      "3.seconds",
       "INTERVAL '3' SECOND",
       "+0 00:00:03.000")
 
     testAllApis(
       3.millis,
-      "3.millis",
       "INTERVAL '0.003' SECOND",
       "+0 00:00:00.003")
   }
@@ -174,19 +156,16 @@ class TemporalTypesTest extends ExpressionTestBase {
     testAllApis(
       'f0,
       "f0",
-      "f0",
       "1990-10-14")
 
     testAllApis(
       'f1,
       "f1",
-      "f1",
       "10:20:45")
 
     testAllApis(
       'f2,
       "f2",
-      "f2",
       "1990-10-14 10:20:45.123")
   }
 
@@ -195,13 +174,11 @@ class TemporalTypesTest extends ExpressionTestBase {
     testAllApis(
       'f9,
       "f9",
-      "f9",
       "+2-00")
 
     testAllApis(
       'f10,
       "f10",
-      "f10",
       "+0 00:00:12.000")
   }
 
@@ -209,69 +186,29 @@ class TemporalTypesTest extends ExpressionTestBase {
   def testTimePointCasting(): Unit = {
     testAllApis(
       'f0.cast(DataTypes.TIMESTAMP(3)),
-      "f0.cast(SQL_TIMESTAMP)",
       "CAST(f0 AS TIMESTAMP(3))",
       "1990-10-14 00:00:00.000")
 
     testAllApis(
       'f1.cast(DataTypes.TIMESTAMP(3)),
-      "f1.cast(SQL_TIMESTAMP)",
       "CAST(f1 AS TIMESTAMP(3))",
       "1970-01-01 10:20:45.000")
 
     testAllApis(
       'f2.cast(DataTypes.DATE),
-      "f2.cast(SQL_DATE)",
       "CAST(f2 AS DATE)",
       "1990-10-14")
 
     testAllApis(
       'f2.cast(DataTypes.TIME),
-      "f2.cast(SQL_TIME)",
       "CAST(f2 AS TIME)",
       "10:20:45")
 
     testAllApis(
       'f2.cast(DataTypes.TIME),
-      "f2.cast(SQL_TIME)",
       "CAST(f2 AS TIME)",
       "10:20:45")
 
-    testTableApi(
-      'f7.cast(DataTypes.DATE),
-      "f7.cast(SQL_DATE)",
-      "2002-11-09")
-
-    testTableApi(
-      'f7.cast(DataTypes.DATE).cast(DataTypes.INT),
-      "f7.cast(SQL_DATE).cast(INT)",
-      "12000")
-
-    testTableApi(
-      'f7.cast(DataTypes.TIME),
-      "f7.cast(SQL_TIME)",
-      "00:00:12")
-
-    testTableApi(
-      'f7.cast(DataTypes.TIME).cast(DataTypes.INT),
-      "f7.cast(SQL_TIME).cast(INT)",
-      "12000")
-
-    testTableApi(
-      'f15.cast(DataTypes.TIMESTAMP(3)),
-      "f15.cast(SQL_TIMESTAMP)",
-      "2016-06-27 07:23:33.000")
-
-    testTableApi(
-      'f15.toTimestamp,
-      "f15.toTimestamp",
-      "2016-06-27 07:23:33.000")
-
-    testTableApi(
-      'f8.cast(DataTypes.TIMESTAMP(3)).cast(DataTypes.BIGINT()),
-      "f8.cast(SQL_TIMESTAMP).cast(LONG)",
-      "1467012213000")
-
     testSqlApi(
       "CAST(CAST('123' as DECIMAL(5, 2)) AS TIMESTAMP)",
       "1970-01-01 00:02:03.000000")
@@ -329,12 +266,10 @@ class TemporalTypesTest extends ExpressionTestBase {
   def testTimeIntervalCasting(): Unit = {
     testTableApi(
       'f7.cast(DataTypes.INTERVAL(DataTypes.MONTH)),
-      "f7.cast(INTERVAL_MONTHS)",
       "+1000-00")
 
     testTableApi(
       'f8.cast(DataTypes.INTERVAL(DataTypes.MINUTE())),
-      "f8.cast(INTERVAL_MILLIS)",
       "+16979 07:23:33.000")
   }
 
@@ -343,31 +278,26 @@ class TemporalTypesTest extends ExpressionTestBase {
     testAllApis(
       'f0 < 'f3,
       "f0 < f3",
-      "f0 < f3",
       "false")
 
     testAllApis(
       'f0 < 'f4,
       "f0 < f4",
-      "f0 < f4",
       "true")
 
     testAllApis(
       'f1 < 'f5,
       "f1 < f5",
-      "f1 < f5",
       "false")
 
     testAllApis(
       'f0.cast(DataTypes.TIMESTAMP(3)) !== 'f2,
-      "f0.cast(SQL_TIMESTAMP) !== f2",
-      "CAST(f0 AS TIMESTAMP(3)) <> f2",
+      "CAST(f0 AS TIMESTAMP(9)) <> f2",
       "true")
 
     testAllApis(
-      'f0.cast(DataTypes.TIMESTAMP(3)) === 'f6,
-      "f0.cast(SQL_TIMESTAMP) === f6",
-      "CAST(f0 AS TIMESTAMP(3)) = f6",
+      'f0.cast(DataTypes.TIMESTAMP(9)) === 'f6,
+      "CAST(f0 AS TIMESTAMP(9)) = f6",
       "true")
   }
 
@@ -378,13 +308,11 @@ class TemporalTypesTest extends ExpressionTestBase {
 
     testAllApis(
       12.months < 24.months,
-      "12.months < 24.months",
       "INTERVAL '12' MONTH < INTERVAL '24' MONTH",
       "true")
 
     testAllApis(
       8.years === 8.years,
-      "8.years === 8.years",
       "INTERVAL '8' YEAR = INTERVAL '8' YEAR",
       "true")
 
@@ -392,13 +320,11 @@ class TemporalTypesTest extends ExpressionTestBase {
 
     testAllApis(
       8.millis > 10.millis,
-      "8.millis > 10.millis",
       "INTERVAL '0.008' SECOND > INTERVAL '0.010' SECOND",
       "false")
 
     testAllApis(
       8.millis === 8.millis,
-      "8.millis === 8.millis",
       "INTERVAL '0.008' SECOND = INTERVAL '0.008' SECOND",
       "true")
 
@@ -406,19 +332,16 @@ class TemporalTypesTest extends ExpressionTestBase {
 
     testAllApis(
       8.years + 10.months,
-      "8.years + 10.months",
       "INTERVAL '8' YEAR + INTERVAL '10' MONTH",
       "+8-10")
 
     testAllApis(
       2.years - 12.months,
-      "2.years - 12.months",
       "INTERVAL '2' YEAR - INTERVAL '12' MONTH",
       "+1-00")
 
     testAllApis(
       -2.years,
-      "-2.years",
       "-INTERVAL '2' YEAR",
       "-2-00")
 
@@ -426,19 +349,16 @@ class TemporalTypesTest extends ExpressionTestBase {
 
     testAllApis(
       8.hours + 10.minutes + 12.seconds + 5.millis,
-      "8.hours + 10.minutes + 12.seconds + 5.millis",
       "INTERVAL '8' HOUR + INTERVAL '10' MINUTE + INTERVAL '12.005' SECOND",
       "+0 08:10:12.005")
 
     testAllApis(
       1.minute - 10.seconds,
-      "1.minute - 10.seconds",
       "INTERVAL '1' MINUTE - INTERVAL '10' SECOND",
       "+0 00:00:50.000")
 
     testAllApis(
       -10.seconds,
-      "-10.seconds",
       "-INTERVAL '10' SECOND",
       "-0 00:00:10.000")
 
@@ -447,28 +367,24 @@ class TemporalTypesTest extends ExpressionTestBase {
     // interval millis
     testAllApis(
       'f0 + 2.days,
-      "f0 + 2.days",
       "f0 + INTERVAL '2' DAY",
       "1990-10-16")
 
     // interval millis
     testAllApis(
       30.days + 'f0,
-      "30.days + f0",
       "INTERVAL '30' DAY + f0",
       "1990-11-13")
 
     // interval months
     testAllApis(
       'f0 + 2.months,
-      "f0 + 2.months",
       "f0 + INTERVAL '2' MONTH",
       "1990-12-14")
 
     // interval months
     testAllApis(
       2.months + 'f0,
-      "2.months + f0",
       "INTERVAL '2' MONTH + f0",
       "1990-12-14")
 
@@ -477,14 +393,12 @@ class TemporalTypesTest extends ExpressionTestBase {
     // interval millis
     testAllApis(
       'f1 + 12.hours,
-      "f1 + 12.hours",
       "f1 + INTERVAL '12' HOUR",
       "22:20:45")
 
     // interval millis
     testAllApis(
       12.hours + 'f1,
-      "12.hours + f1",
       "INTERVAL '12' HOUR + f1",
       "22:20:45")
 
@@ -493,28 +407,24 @@ class TemporalTypesTest extends ExpressionTestBase {
     // interval millis
     testAllApis(
       'f2 + 10.days + 4.millis,
-      "f2 + 10.days + 4.millis",
       "f2 + INTERVAL '10 00:00:00.004' DAY TO SECOND",
       "1990-10-24 10:20:45.127")
 
     // interval millis
     testAllApis(
       10.days + 'f2 + 4.millis,
-      "10.days + f2 + 4.millis",
       "INTERVAL '10 00:00:00.004' DAY TO SECOND + f2",
       "1990-10-24 10:20:45.127")
 
     // interval months
     testAllApis(
       'f2 + 10.years,
-      "f2 + 10.years",
       "f2 + INTERVAL '10' YEAR",
       "2000-10-14 10:20:45.123")
 
     // interval months
     testAllApis(
       10.years + 'f2,
-      "10.years + f2",
       "INTERVAL '10' YEAR + f2",
       "2000-10-14 10:20:45.123")
 
@@ -523,28 +433,24 @@ class TemporalTypesTest extends ExpressionTestBase {
     // interval millis
     testAllApis(
       'f0 - 2.days,
-      "f0 - 2.days",
       "f0 - INTERVAL '2' DAY",
       "1990-10-12")
 
     // interval millis
     testAllApis(
       -30.days + 'f0,
-      "-30.days + f0",
       "INTERVAL '-30' DAY + f0",
       "1990-09-14")
 
     // interval months
     testAllApis(
       'f0 - 2.months,
-      "f0 - 2.months",
       "f0 - INTERVAL '2' MONTH",
       "1990-08-14")
 
     // interval months
     testAllApis(
       -2.months + 'f0,
-      "-2.months + f0",
       "-INTERVAL '2' MONTH + f0",
       "1990-08-14")
 
@@ -553,14 +459,12 @@ class TemporalTypesTest extends ExpressionTestBase {
     // interval millis
     testAllApis(
       'f1 - 12.hours,
-      "f1 - 12.hours",
       "f1 - INTERVAL '12' HOUR",
       "22:20:45")
 
     // interval millis
     testAllApis(
       -12.hours + 'f1,
-      "-12.hours + f1",
       "INTERVAL '-12' HOUR + f1",
       "22:20:45")
 
@@ -569,68 +473,60 @@ class TemporalTypesTest extends ExpressionTestBase {
     // interval millis
     testAllApis(
       'f2 - 10.days - 4.millis,
-      "f2 - 10.days - 4.millis",
       "f2 - INTERVAL '10 00:00:00.004' DAY TO SECOND",
       "1990-10-04 10:20:45.119")
 
     // interval millis
     testAllApis(
       -10.days + 'f2 - 4.millis,
-      "-10.days + f2 - 4.millis",
       "INTERVAL '-10 00:00:00.004' DAY TO SECOND + f2",
       "1990-10-04 10:20:45.119")
 
     // interval months
     testAllApis(
       'f2 - 10.years,
-      "f2 - 10.years",
       "f2 - INTERVAL '10' YEAR",
       "1980-10-14 10:20:45.123")
 
     // interval months
     testAllApis(
       -10.years + 'f2,
-      "-10.years + f2",
       "INTERVAL '-10' YEAR + f2",
       "1980-10-14 10:20:45.123")
 
     // casting
 
     testAllApis(
-      -'f9.cast(DataTypes.INTERVAL(DataTypes.MONTH)),
-      "-f9.cast(INTERVAL_MONTHS)",
-      "-CAST(f9 AS INTERVAL YEAR)",
+      // TODO fix after FLIP-51
+      -'f9.cast(DataTypes.INTERVAL(DataTypes.MONTH).bridgedTo(classOf[JInt])),
+      "-CAST(f9 AS INTERVAL MONTH)",
       "-2-00")
 
     testAllApis(
-      -'f10.cast(DataTypes.INTERVAL(DataTypes.MINUTE())),
-      "-f10.cast(INTERVAL_MILLIS)",
-      "-CAST(f10 AS INTERVAL SECOND)",
+      // TODO fix after FLIP-51
+      -'f10.cast(DataTypes.INTERVAL(DataTypes.SECOND(3)).bridgedTo(classOf[JLong])),
+      "-CAST(f10 AS INTERVAL SECOND(3))",
       "-0 00:00:12.000")
 
     // addition/subtraction of interval millis and interval months
 
     testAllApis(
       'f0 + 2.days + 1.month,
-      "f0 + 2.days + 1.month",
       "f0 + INTERVAL '2' DAY + INTERVAL '1' MONTH",
       "1990-11-16")
 
     testAllApis(
       'f0 - 2.days - 1.month,
-      "f0 - 2.days - 1.month",
       "f0 - INTERVAL '2' DAY - INTERVAL '1' MONTH",
       "1990-09-12")
 
     testAllApis(
       'f2 + 2.days + 1.month,
-      "f2 + 2.days + 1.month",
       "f2 + INTERVAL '2' DAY + INTERVAL '1' MONTH",
       "1990-11-16 10:20:45.123")
 
     testAllApis(
       'f2 - 2.days - 1.month,
-      "f2 - 2.days - 1.month",
       "f2 - INTERVAL '2' DAY - INTERVAL '1' MONTH",
       "1990-09-12 10:20:45.123")
   }
@@ -640,35 +536,30 @@ class TemporalTypesTest extends ExpressionTestBase {
     testAllApis(
       'f11,
       "f11",
-      "f11",
       "null"
     )
     testAllApis(
       'f12,
       "f12",
-      "f12",
       "null"
     )
     testAllApis(
       'f13,
       "f13",
-      "f13",
       "null"
     )
   }
 
   @Test
-  def testTemporalNullValues() = {
+  def testTemporalNullValues(): Unit = {
     testAllApis(
       'f13.extract(TimeIntervalUnit.HOUR),
-      "f13.extract(HOUR)",
       "extract(HOUR FROM f13)",
       "null"
     )
 
     testAllApis(
       'f13.floor(TimeIntervalUnit.HOUR),
-      "f13.floor(HOUR)",
       "FLOOR(f13 TO HOUR)",
       "null"
     )
@@ -1280,39 +1171,40 @@ class TemporalTypesTest extends ExpressionTestBase {
     testData.setField(22, 3)
     testData.setField(23, localDateTime("1970-01-01 00:00:00.123456789")
       .atZone(config.getLocalTimeZone).toInstant)
-    testData.setField(24, localDateTime("1970-01-01 00:00:00.123456789"))
+    testData.setField(24, localDateTime("1970-01-01 00:00:00.123456789")
+      .atZone(config.getLocalTimeZone).toInstant)
     testData.setField(25, localDateTime("1970-01-01 00:00:00.123456789").toInstant(ZoneOffset.UTC))
     testData
   }
 
-  override def typeInfo: RowTypeInfo = {
-    new RowTypeInfo(
-      /* 0 */  Types.LOCAL_DATE,
-      /* 1 */  Types.LOCAL_TIME,
-      /* 2 */  Types.LOCAL_DATE_TIME,
-      /* 3 */  Types.LOCAL_DATE,
-      /* 4 */  Types.LOCAL_DATE,
-      /* 5 */  Types.LOCAL_TIME,
-      /* 6 */  Types.LOCAL_DATE_TIME,
-      /* 7 */  Types.INT,
-      /* 8 */  Types.LONG,
-      /* 9 */  TimeIntervalTypeInfo.INTERVAL_MONTHS,
-      /* 10 */ TimeIntervalTypeInfo.INTERVAL_MILLIS,
-      /* 11 */ Types.LOCAL_DATE,
-      /* 12 */ Types.LOCAL_TIME,
-      /* 13 */ Types.LOCAL_DATE_TIME,
-      /* 14 */ Types.STRING,
-      /* 15 */ Types.LONG,
-      /* 16 */ Types.INSTANT,
-      /* 17 */ Types.INSTANT,
-      /* 18 */ Types.INSTANT,
-      /* 19 */ Types.INSTANT,
-      /* 20 */ Types.INSTANT,
-      /* 21 */ Types.LONG,
-      /* 22 */ Types.INT,
-      /* 23 */ new LegacyInstantTypeInfo(9),
-      /* 24 */ new LegacyLocalDateTimeTypeInfo(9),
-      /* 25 */ Types.INSTANT
-    )
-  }
+  override def testDataType: DataType = DataTypes.ROW(
+    DataTypes.FIELD("f0", DataTypes.DATE()),
+    DataTypes.FIELD("f1", DataTypes.TIME(0)),
+    DataTypes.FIELD("f2", DataTypes.TIMESTAMP(3)),
+    DataTypes.FIELD("f3", DataTypes.DATE()),
+    DataTypes.FIELD("f4", DataTypes.DATE()),
+    DataTypes.FIELD("f5", DataTypes.TIME(0)),
+    DataTypes.FIELD("f6", DataTypes.TIMESTAMP(9)),
+    DataTypes.FIELD("f7", DataTypes.INT()),
+    DataTypes.FIELD("f8", DataTypes.BIGINT()),
+    DataTypes.FIELD("f9", DataTypes.INTERVAL(DataTypes.MONTH()).bridgedTo(classOf[JInt])),
+    DataTypes.FIELD("f10", DataTypes.INTERVAL(DataTypes.SECOND(3)).bridgedTo(classOf[JLong])),
+    DataTypes.FIELD("f11", DataTypes.DATE()),
+    DataTypes.FIELD("f12", DataTypes.TIME(0)),
+    DataTypes.FIELD("f13", DataTypes.TIMESTAMP(9)),
+    DataTypes.FIELD("f14", DataTypes.STRING()),
+    DataTypes.FIELD("f15", DataTypes.BIGINT()),
+    DataTypes.FIELD("f16", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(9)),
+    DataTypes.FIELD("f17", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)),
+    DataTypes.FIELD("f18", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(9)),
+    DataTypes.FIELD("f19", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(9)),
+    DataTypes.FIELD("f20", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(9)),
+    DataTypes.FIELD("f21", DataTypes.BIGINT()),
+    DataTypes.FIELD("f22", DataTypes.INT()),
+    DataTypes.FIELD("f23", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(9)),
+    DataTypes.FIELD("f24", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(9)),
+    DataTypes.FIELD("f25", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(9))
+  )
+
+  override def containsLegacyTypes: Boolean = false
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/TemporalTypesTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/TemporalTypesTest.scala
index 5cdf3c0..94183b1 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/TemporalTypesTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/TemporalTypesTest.scala
@@ -198,26 +198,6 @@ class TemporalTypesTest extends ExpressionTestBase {
       "10:20:45")
 
     testTableApi(
-      'f7.cast(Types.SQL_DATE),
-      "f7.cast(SQL_DATE)",
-      "2002-11-09")
-
-    testTableApi(
-      'f7.cast(Types.SQL_DATE).cast(Types.INT),
-      "f7.cast(SQL_DATE).cast(INT)",
-      "12000")
-
-    testTableApi(
-      'f7.cast(Types.SQL_TIME),
-      "f7.cast(SQL_TIME)",
-      "00:00:12")
-
-    testTableApi(
-      'f7.cast(Types.SQL_TIME).cast(Types.INT),
-      "f7.cast(SQL_TIME).cast(INT)",
-      "12000")
-
-    testTableApi(
       'f8.cast(Types.SQL_TIMESTAMP),
       "f8.cast(SQL_TIMESTAMP)",
       "2016-06-27 07:23:33.000")


[flink] 02/04: [hotfix][table-common] Align explicit casting with Calcite's SqlTypeCoercionRule

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7dc419342779d08f6c1ae6adf42e58bdc6056971
Author: Timo Walther <tw...@apache.org>
AuthorDate: Fri May 29 08:18:43 2020 +0200

    [hotfix][table-common] Align explicit casting with Calcite's SqlTypeCoercionRule
---
 .../types/logical/utils/LogicalTypeCasts.java      | 33 ++++++++++++++--------
 .../flink/table/types/LogicalTypeCastsTest.java    |  2 +-
 2 files changed, 22 insertions(+), 13 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java
index ea63e0a..bc8640c 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java
@@ -119,76 +119,85 @@ public final class LogicalTypeCasts {
 		castTo(CHAR)
 			.implicitFrom(CHAR)
 			.explicitFromFamily(PREDEFINED)
-			.explicitNotFromFamily(BINARY_STRING)
 			.build();
 
 		castTo(VARCHAR)
 			.implicitFromFamily(CHARACTER_STRING)
 			.explicitFromFamily(PREDEFINED)
-			.explicitNotFromFamily(BINARY_STRING)
 			.build();
 
 		castTo(BOOLEAN)
 			.implicitFrom(BOOLEAN)
-			.explicitFromFamily(CHARACTER_STRING)
+			.explicitFromFamily(CHARACTER_STRING, NUMERIC)
 			.build();
 
 		castTo(BINARY)
 			.implicitFrom(BINARY)
+			.explicitFromFamily(CHARACTER_STRING)
+			.explicitFrom(VARBINARY)
 			.build();
 
 		castTo(VARBINARY)
 			.implicitFromFamily(BINARY_STRING)
+			.explicitFromFamily(CHARACTER_STRING)
+			.explicitFrom(BINARY)
 			.build();
 
 		castTo(DECIMAL)
 			.implicitFromFamily(NUMERIC)
-			.explicitFromFamily(CHARACTER_STRING)
+			.explicitFromFamily(CHARACTER_STRING, INTERVAL)
+			.explicitFrom(BOOLEAN, TIMESTAMP_WITHOUT_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TIME_ZONE)
 			.build();
 
 		castTo(TINYINT)
 			.implicitFrom(TINYINT)
-			.explicitFromFamily(NUMERIC, CHARACTER_STRING)
+			.explicitFromFamily(NUMERIC, CHARACTER_STRING, INTERVAL)
+			.explicitFrom(BOOLEAN, TIMESTAMP_WITHOUT_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TIME_ZONE)
 			.build();
 
 		castTo(SMALLINT)
 			.implicitFrom(TINYINT, SMALLINT)
-			.explicitFromFamily(NUMERIC, CHARACTER_STRING)
+			.explicitFromFamily(NUMERIC, CHARACTER_STRING, INTERVAL)
+			.explicitFrom(BOOLEAN, TIMESTAMP_WITHOUT_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TIME_ZONE)
 			.build();
 
 		castTo(INTEGER)
 			.implicitFrom(TINYINT, SMALLINT, INTEGER)
-			.explicitFromFamily(NUMERIC, CHARACTER_STRING)
+			.explicitFromFamily(NUMERIC, CHARACTER_STRING, INTERVAL)
+			.explicitFrom(BOOLEAN, TIMESTAMP_WITHOUT_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TIME_ZONE)
 			.build();
 
 		castTo(BIGINT)
 			.implicitFrom(TINYINT, SMALLINT, INTEGER, BIGINT)
-			.explicitFromFamily(NUMERIC, CHARACTER_STRING)
+			.explicitFromFamily(NUMERIC, CHARACTER_STRING, INTERVAL)
+			.explicitFrom(BOOLEAN, TIMESTAMP_WITHOUT_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TIME_ZONE)
 			.build();
 
 		castTo(FLOAT)
 			.implicitFrom(TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DECIMAL)
 			.explicitFromFamily(NUMERIC, CHARACTER_STRING)
+			.explicitFrom(BOOLEAN, TIMESTAMP_WITHOUT_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TIME_ZONE)
 			.build();
 
 		castTo(DOUBLE)
 			.implicitFromFamily(NUMERIC)
 			.explicitFromFamily(CHARACTER_STRING)
+			.explicitFrom(BOOLEAN, TIMESTAMP_WITHOUT_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TIME_ZONE)
 			.build();
 
 		castTo(DATE)
 			.implicitFrom(DATE, TIMESTAMP_WITHOUT_TIME_ZONE)
-			.explicitFromFamily(TIMESTAMP, CHARACTER_STRING)
+			.explicitFromFamily(TIMESTAMP, CHARACTER_STRING, BINARY_STRING)
 			.build();
 
 		castTo(TIME_WITHOUT_TIME_ZONE)
 			.implicitFrom(TIME_WITHOUT_TIME_ZONE, TIMESTAMP_WITHOUT_TIME_ZONE)
-			.explicitFromFamily(TIME, TIMESTAMP, CHARACTER_STRING)
+			.explicitFromFamily(TIME, TIMESTAMP, CHARACTER_STRING, BINARY_STRING)
 			.build();
 
 		castTo(TIMESTAMP_WITHOUT_TIME_ZONE)
 			.implicitFrom(TIMESTAMP_WITHOUT_TIME_ZONE)
-			.explicitFromFamily(DATETIME, CHARACTER_STRING)
+			.explicitFromFamily(DATETIME, CHARACTER_STRING, BINARY_STRING, NUMERIC)
 			.build();
 
 		castTo(TIMESTAMP_WITH_TIME_ZONE)
@@ -198,7 +207,7 @@ public final class LogicalTypeCasts {
 
 		castTo(TIMESTAMP_WITH_LOCAL_TIME_ZONE)
 			.implicitFrom(TIMESTAMP_WITH_LOCAL_TIME_ZONE)
-			.explicitFromFamily(DATETIME, CHARACTER_STRING)
+			.explicitFromFamily(DATETIME, CHARACTER_STRING, BINARY_STRING, NUMERIC)
 			.build();
 
 		castTo(INTERVAL_YEAR_MONTH)
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeCastsTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeCastsTest.java
index 03fb1b7..8f79fe6 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeCastsTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeCastsTest.java
@@ -158,7 +158,7 @@ public class LogicalTypeCastsTest {
 						)
 					),
 					false,
-					false
+					true
 				},
 
 				{


[flink] 01/04: [hotfix][table-planner-blink] Prepare ExpressionTestBase for new type system

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b2f88660b2b0897e57565e5b197ba83f20a03228
Author: Timo Walther <tw...@apache.org>
AuthorDate: Tue May 26 16:47:06 2020 +0200

    [hotfix][table-planner-blink] Prepare ExpressionTestBase for new type system
---
 .../expressions/utils/ExpressionTestBase.scala     | 116 ++++++++++++++-------
 1 file changed, 80 insertions(+), 36 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
index 8469857..71a7687 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
@@ -18,42 +18,42 @@
 
 package org.apache.flink.table.planner.expressions.utils
 
+import java.util.Collections
+
+import org.apache.calcite.plan.hep.{HepPlanner, HepProgramBuilder}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.logical.LogicalCalc
+import org.apache.calcite.rel.rules._
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.`type`.SqlTypeName.VARCHAR
 import org.apache.flink.api.common.TaskInfo
 import org.apache.flink.api.common.functions.util.RuntimeUDFContext
 import org.apache.flink.api.common.functions.{MapFunction, RichFunction, RichMapFunction}
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-import org.apache.flink.table.api.internal.TableEnvironmentImpl
 import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl
 import org.apache.flink.table.api.{EnvironmentSettings, TableConfig}
 import org.apache.flink.table.data.RowData
 import org.apache.flink.table.data.binary.BinaryRowData
+import org.apache.flink.table.data.conversion.{DataStructureConverter, DataStructureConverters}
 import org.apache.flink.table.data.util.DataFormatConverters
+import org.apache.flink.table.data.util.DataFormatConverters.DataFormatConverter
 import org.apache.flink.table.expressions.{Expression, ExpressionParser}
 import org.apache.flink.table.functions.ScalarFunction
 import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, ExprCodeGenerator, FunctionCodeGenerator}
 import org.apache.flink.table.planner.delegation.PlannerBase
 import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
-import org.apache.flink.table.types.DataType
+import org.apache.flink.table.types.AbstractDataType
 import org.apache.flink.table.types.logical.{RowType, VarCharType}
 import org.apache.flink.table.types.utils.TypeConversions
 import org.apache.flink.types.Row
-
-import org.apache.calcite.plan.hep.{HepPlanner, HepProgramBuilder}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.logical.{LogicalCalc, LogicalTableScan}
-import org.apache.calcite.rel.rules._
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.`type`.SqlTypeName.VARCHAR
-
 import org.junit.Assert.{assertEquals, fail}
 import org.junit.rules.ExpectedException
 import org.junit.{After, Before, Rule}
 
-import java.util.Collections
-
 import scala.collection.mutable
+import scala.collection.JavaConverters._
 
 abstract class ExpressionTestBase {
 
@@ -66,7 +66,13 @@ abstract class ExpressionTestBase {
   // use impl class instead of interface class to avoid
   // "Static methods in interface require -target:jvm-1.8"
   private val tEnv = StreamTableEnvironmentImpl.create(env, setting, config)
-  private val planner = tEnv.asInstanceOf[TableEnvironmentImpl].getPlanner.asInstanceOf[PlannerBase]
+    .asInstanceOf[StreamTableEnvironmentImpl]
+  private val resolvedDataType = if (containsLegacyTypes) {
+    TypeConversions.fromLegacyInfoToDataType(typeInfo)
+  } else {
+    tEnv.getCatalogManager.getDataTypeFactory.createDataType(testDataType)
+  }
+  private val planner = tEnv.getPlanner.asInstanceOf[PlannerBase]
   private val relBuilder = planner.getRelBuilder
   private val calcitePlanner = planner.createFlinkPlanner
   private val parser = planner.plannerContext.createCalciteParser()
@@ -82,13 +88,16 @@ abstract class ExpressionTestBase {
   @Rule
   def thrown: ExpectedException = expectedException
 
-  def functions: Map[String, ScalarFunction] = Map()
-
   @Before
   def prepare(): Unit = {
-    val ds = env.fromCollection(Collections.emptyList[Row](), typeInfo)
-    tEnv.createTemporaryView(tableName, ds)
-    functions.foreach(f => tEnv.registerFunction(f._1, f._2))
+    if (containsLegacyTypes) {
+      val ds = env.fromCollection(Collections.emptyList[Row](), typeInfo)
+      tEnv.createTemporaryView(tableName, ds)
+      functions.foreach(f => tEnv.registerFunction(f._1, f._2))
+    } else {
+      tEnv.createTemporaryView(tableName, tEnv.fromValues(resolvedDataType))
+      testSystemFunctions.asScala.foreach(e => tEnv.createTemporarySystemFunction(e._1, e._2))
+    }
 
     // prepare RelBuilder
     relBuilder.scan(tableName)
@@ -100,7 +109,11 @@ abstract class ExpressionTestBase {
   @After
   def evaluateExprs(): Unit = {
     val ctx = CodeGeneratorContext(config)
-    val inputType = fromTypeInfoToLogicalType(typeInfo)
+    val inputType = if (containsLegacyTypes) {
+      fromTypeInfoToLogicalType(typeInfo)
+    } else {
+      resolvedDataType.getLogicalType
+    }
     val exprGenerator = new ExprCodeGenerator(ctx, nullableInput = false).bindInput(inputType)
 
     // cast expressions to String
@@ -145,10 +158,18 @@ abstract class ExpressionTestBase {
       richMapper.open(new Configuration())
     }
 
-    val converter = DataFormatConverters
-      .getConverterForDataType(dataType)
-      .asInstanceOf[DataFormatConverters.DataFormatConverter[RowData, Row]]
-    val testRow = converter.toInternal(testData)
+    val testRow = if (containsLegacyTypes) {
+      val converter = DataFormatConverters
+        .getConverterForDataType(resolvedDataType)
+        .asInstanceOf[DataFormatConverter[RowData, Row]]
+      converter.toInternal(testData)
+    } else {
+      val converter = DataStructureConverters
+        .getConverter(resolvedDataType)
+        .asInstanceOf[DataStructureConverter[RowData, Row]]
+      converter.toInternalOrNull(testData)
+    }
+
     val result = mapper.map(testRow)
 
     // call close method for RichFunction
@@ -194,7 +215,7 @@ abstract class ExpressionTestBase {
     val optimized = hep.findBestExp()
 
     // throw exception if plan contains more than a calc
-    if (!optimized.getInput(0).isInstanceOf[LogicalTableScan]) {
+    if (!optimized.getInput(0).getInputs.isEmpty) {
       fail("Expression is converted into more than a Calc operation. Use a different test method.")
     }
 
@@ -210,24 +231,14 @@ abstract class ExpressionTestBase {
 
   def testAllApis(
       expr: Expression,
-      exprString: String,
       sqlExpr: String,
       expected: String): Unit = {
     addTableApiTestExpr(expr, expected)
-    addTableApiTestExpr(exprString, expected)
     addSqlTestExpr(sqlExpr, expected)
   }
 
   def testTableApi(
       expr: Expression,
-      exprString: String,
-      expected: String): Unit = {
-    addTableApiTestExpr(expr, expected)
-    addTableApiTestExpr(exprString, expected)
-  }
-
-  def testTableApi(
-      expr: Expression,
       expected: String): Unit = {
     addTableApiTestExpr(expr, expected)
   }
@@ -252,8 +263,41 @@ abstract class ExpressionTestBase {
 
   def testData: Row
 
-  def typeInfo: RowTypeInfo
+  def testDataType: AbstractDataType[_] =
+    throw new IllegalArgumentException("Implement this if no legacy types are expected.")
+
+  def testSystemFunctions: java.util.Map[String, ScalarFunction] = Collections.emptyMap();
+
+  // ----------------------------------------------------------------------------------------------
+  // Legacy type system
+  // ----------------------------------------------------------------------------------------------
 
-  def dataType: DataType = TypeConversions.fromLegacyInfoToDataType(typeInfo)
+  def containsLegacyTypes: Boolean = true
 
+  @deprecated
+  def functions: Map[String, ScalarFunction] = Map()
+
+  @deprecated
+  def typeInfo: RowTypeInfo =
+    throw new IllegalArgumentException("Implement this if legacy types are expected.")
+
+  @deprecated
+  def testAllApis(
+      expr: Expression,
+      exprString: String,
+      sqlExpr: String,
+      expected: String): Unit = {
+    addTableApiTestExpr(expr, expected)
+    addTableApiTestExpr(exprString, expected)
+    addSqlTestExpr(sqlExpr, expected)
+  }
+
+  @deprecated
+  def testTableApi(
+      expr: Expression,
+      exprString: String,
+      expected: String): Unit = {
+    addTableApiTestExpr(expr, expected)
+    addTableApiTestExpr(exprString, expected)
+  }
 }


[flink] 03/04: [hotfix][table-common] Add constraint argument type strategy

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 87d80878bda035d4697b65f75a6261a6054b60a9
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon May 25 16:18:46 2020 +0200

    [hotfix][table-common] Add constraint argument type strategy
---
 .../table/types/inference/InputTypeStrategies.java | 11 +++
 .../strategies/ConstraintArgumentTypeStrategy.java | 88 ++++++++++++++++++++++
 .../types/inference/InputTypeStrategiesTest.java   | 28 ++++++-
 3 files changed, 126 insertions(+), 1 deletion(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java
index db589db..615f392 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.types.inference.strategies.AndArgumentTypeStrategy
 import org.apache.flink.table.types.inference.strategies.AnyArgumentTypeStrategy;
 import org.apache.flink.table.types.inference.strategies.ArrayInputTypeStrategy;
 import org.apache.flink.table.types.inference.strategies.ComparableTypeStrategy;
+import org.apache.flink.table.types.inference.strategies.ConstraintArgumentTypeStrategy;
 import org.apache.flink.table.types.inference.strategies.ExplicitArgumentTypeStrategy;
 import org.apache.flink.table.types.inference.strategies.FamilyArgumentTypeStrategy;
 import org.apache.flink.table.types.inference.strategies.LiteralArgumentTypeStrategy;
@@ -41,6 +42,7 @@ import org.apache.flink.table.types.logical.StructuredType.StructuredComparision
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 /**
@@ -218,6 +220,15 @@ public final class InputTypeStrategies {
 	}
 
 	/**
+	 * Strategy for an argument that must fulfill a given constraint.
+	 */
+	public static ConstraintArgumentTypeStrategy constraint(
+			String constraintMessage,
+			Function<List<DataType>, Boolean> evaluator) {
+		return new ConstraintArgumentTypeStrategy(constraintMessage, evaluator);
+	}
+
+	/**
 	 * Strategy for a conjunction of multiple {@link ArgumentTypeStrategy}s into one like
 	 * {@code f(NUMERIC && LITERAL)}.
 	 *
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ConstraintArgumentTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ConstraintArgumentTypeStrategy.java
new file mode 100644
index 0000000..4457104
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ConstraintArgumentTypeStrategy.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.Signature.Argument;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Function;
+
+/**
+ * Strategy for an argument that must fulfill a given constraint.
+ */
+@Internal
+public final class ConstraintArgumentTypeStrategy implements ArgumentTypeStrategy {
+
+	private final String constraintMessage;
+
+	private final Function<List<DataType>, Boolean> evaluator;
+
+	public ConstraintArgumentTypeStrategy(
+			String constraintMessage,
+			Function<List<DataType>, Boolean> evaluator) {
+		this.constraintMessage = constraintMessage;
+		this.evaluator = evaluator;
+	}
+
+	@Override
+	public Optional<DataType> inferArgumentType(CallContext callContext, int argumentPos, boolean throwOnFailure) {
+		final List<DataType> actualDataTypes = callContext.getArgumentDataTypes();
+
+		// type fulfills constraint
+		if (evaluator.apply(actualDataTypes)) {
+			return Optional.of(actualDataTypes.get(argumentPos));
+		}
+
+		if (throwOnFailure) {
+			throw callContext.newValidationError(
+				constraintMessage,
+				actualDataTypes.toArray());
+		}
+		return Optional.empty();
+	}
+
+	@Override
+	public Argument getExpectedArgument(FunctionDefinition functionDefinition, int argumentPos) {
+		return Argument.of("<CONSTRAINT>");
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		ConstraintArgumentTypeStrategy that = (ConstraintArgumentTypeStrategy) o;
+		return constraintMessage.equals(that.constraintMessage) && evaluator.equals(that.evaluator);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(constraintMessage, evaluator);
+	}
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java
index 38b1888..15f0bb9 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java
@@ -34,6 +34,7 @@ import static org.apache.flink.table.types.inference.InputTypeStrategies.LITERAL
 import static org.apache.flink.table.types.inference.InputTypeStrategies.OUTPUT_IF_NULL;
 import static org.apache.flink.table.types.inference.InputTypeStrategies.WILDCARD;
 import static org.apache.flink.table.types.inference.InputTypeStrategies.and;
+import static org.apache.flink.table.types.inference.InputTypeStrategies.constraint;
 import static org.apache.flink.table.types.inference.InputTypeStrategies.explicit;
 import static org.apache.flink.table.types.inference.InputTypeStrategies.explicitSequence;
 import static org.apache.flink.table.types.inference.InputTypeStrategies.logical;
@@ -540,7 +541,32 @@ public class InputTypeStrategiesTest extends InputTypeStrategiesTestBase {
 				.calledWithArgumentTypes(DataTypes.FLOAT())
 				.expectSignature("f(<EXACT_NUMERIC>)")
 				.expectErrorMessage(
-					"Unsupported argument type. Expected type of family 'EXACT_NUMERIC' but actual type was 'FLOAT'.")
+					"Unsupported argument type. Expected type of family 'EXACT_NUMERIC' but actual type was 'FLOAT'."),
+
+			TestSpec
+				.forStrategy(
+					"Constraint argument type strategy",
+					sequence(
+						and(
+							explicit(DataTypes.BOOLEAN()),
+							constraint(
+								"%s must be nullable.",
+								args -> args.get(0).getLogicalType().isNullable()))))
+				.calledWithArgumentTypes(DataTypes.BOOLEAN())
+				.expectSignature("f([BOOLEAN & <CONSTRAINT>])")
+				.expectArgumentTypes(DataTypes.BOOLEAN()),
+
+			TestSpec
+				.forStrategy(
+					"Constraint argument type strategy invalid",
+					sequence(
+						and(
+							explicit(DataTypes.BOOLEAN().notNull()),
+							constraint(
+								"My constraint says %s must be nullable.",
+								args -> args.get(0).getLogicalType().isNullable()))))
+				.calledWithArgumentTypes(DataTypes.BOOLEAN().notNull())
+				.expectErrorMessage("My constraint says BOOLEAN NOT NULL must be nullable.")
 		);
 	}
 }