You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2020/05/26 14:10:07 UTC

[flink] 01/02: [hotfix][table-common] Add logical type root/family argument strategies

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cea14b18c0b9451f5e887a105c13908e839681b3
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon May 25 13:04:51 2020 +0200

    [hotfix][table-common] Add logical type root/family argument strategies
---
 .../table/types/inference/InputTypeStrategies.java |  36 ++++
 .../strategies/FamilyArgumentTypeStrategy.java     | 138 +++++++++++++++
 .../strategies/RootArgumentTypeStrategy.java       | 105 +++++++++++
 .../types/inference/strategies/StrategyUtils.java  | 192 +++++++++++++++++++++
 .../flink/table/types/logical/DoubleType.java      |   2 +
 .../flink/table/types/logical/FloatType.java       |   2 +
 .../types/logical/utils/LogicalTypeChecks.java     |  12 ++
 .../types/inference/InputTypeStrategiesTest.java   |  83 ++++++++-
 8 files changed, 569 insertions(+), 1 deletion(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java
index 770199c..7476aac 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java
@@ -24,14 +24,18 @@ import org.apache.flink.table.types.inference.strategies.AndArgumentTypeStrategy
 import org.apache.flink.table.types.inference.strategies.AnyArgumentTypeStrategy;
 import org.apache.flink.table.types.inference.strategies.ArrayInputTypeStrategy;
 import org.apache.flink.table.types.inference.strategies.ExplicitArgumentTypeStrategy;
+import org.apache.flink.table.types.inference.strategies.FamilyArgumentTypeStrategy;
 import org.apache.flink.table.types.inference.strategies.LiteralArgumentTypeStrategy;
 import org.apache.flink.table.types.inference.strategies.MapInputTypeStrategy;
 import org.apache.flink.table.types.inference.strategies.OrArgumentTypeStrategy;
 import org.apache.flink.table.types.inference.strategies.OrInputTypeStrategy;
 import org.apache.flink.table.types.inference.strategies.OutputArgumentTypeStrategy;
+import org.apache.flink.table.types.inference.strategies.RootArgumentTypeStrategy;
 import org.apache.flink.table.types.inference.strategies.SequenceInputTypeStrategy;
 import org.apache.flink.table.types.inference.strategies.VaryingSequenceInputTypeStrategy;
 import org.apache.flink.table.types.inference.strategies.WildcardInputTypeStrategy;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
 
 import java.util.Arrays;
 import java.util.List;
@@ -170,6 +174,38 @@ public final class InputTypeStrategies {
 	}
 
 	/**
+	 * Strategy for an argument that corresponds to a given {@link LogicalTypeRoot}.
+	 * Implicit casts will be inserted if possible.
+	 */
+	public static RootArgumentTypeStrategy logical(LogicalTypeRoot expectedRoot) {
+		return new RootArgumentTypeStrategy(expectedRoot, null);
+	}
+
+	/**
+	 * Strategy for an argument that corresponds to a given {@link LogicalTypeRoot} and nullability.
+	 * Implicit casts will be inserted if possible.
+	 */
+	public static RootArgumentTypeStrategy logical(LogicalTypeRoot expectedRoot, boolean expectedNullability) {
+		return new RootArgumentTypeStrategy(expectedRoot, expectedNullability);
+	}
+
+	/**
+	 * Strategy for an argument that corresponds to a given {@link LogicalTypeFamily}.
+	 * Implicit casts will be inserted if possible.
+	 */
+	public static FamilyArgumentTypeStrategy logical(LogicalTypeFamily expectedFamily) {
+		return new FamilyArgumentTypeStrategy(expectedFamily, null);
+	}
+
+	/**
+	 * Strategy for an argument that corresponds to a given {@link LogicalTypeFamily} and nullability.
+	 * Implicit casts will be inserted if possible.
+	 */
+	public static FamilyArgumentTypeStrategy logical(LogicalTypeFamily expectedFamily, boolean expectedNullability) {
+		return new FamilyArgumentTypeStrategy(expectedFamily, expectedNullability);
+	}
+
+	/**
 	 * Strategy for a conjunction of multiple {@link ArgumentTypeStrategy}s into one like
 	 * {@code f(NUMERIC && LITERAL)}.
 	 *
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FamilyArgumentTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FamilyArgumentTypeStrategy.java
new file mode 100644
index 0000000..b5f154f
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FamilyArgumentTypeStrategy.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+import static org.apache.flink.table.types.inference.strategies.StrategyUtils.findDataType;
+
+/**
+ * Strategy for an argument that corresponds to a given {@link LogicalTypeFamily} and nullability.
+ *
+ * <p>Implicit casts will be inserted if possible.
+ */
+@Internal
+public final class FamilyArgumentTypeStrategy implements ArgumentTypeStrategy {
+
+	private final LogicalTypeFamily expectedFamily;
+
+	private final @Nullable Boolean expectedNullability;
+
+	private static final Map<LogicalTypeFamily, LogicalTypeRoot> familyToRoot = new HashMap<>();
+	static {
+		// "fallback" root for a NULL literals,
+		// they receive the smallest precision possible for having little impact when finding a common type.
+		familyToRoot.put(LogicalTypeFamily.NUMERIC, LogicalTypeRoot.TINYINT);
+		familyToRoot.put(LogicalTypeFamily.EXACT_NUMERIC, LogicalTypeRoot.TINYINT);
+		familyToRoot.put(LogicalTypeFamily.CHARACTER_STRING, LogicalTypeRoot.VARCHAR);
+		familyToRoot.put(LogicalTypeFamily.BINARY_STRING, LogicalTypeRoot.VARBINARY);
+		familyToRoot.put(LogicalTypeFamily.APPROXIMATE_NUMERIC, LogicalTypeRoot.DOUBLE);
+		familyToRoot.put(LogicalTypeFamily.TIMESTAMP, LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);
+		familyToRoot.put(LogicalTypeFamily.TIME, LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE);
+	}
+
+	public FamilyArgumentTypeStrategy(LogicalTypeFamily expectedFamily, @Nullable Boolean expectedNullability) {
+		Preconditions.checkArgument(
+			familyToRoot.containsKey(expectedFamily),
+			"Unsupported family for argument type strategy.");
+		this.expectedFamily = expectedFamily;
+		this.expectedNullability = expectedNullability;
+	}
+
+	@Override
+	public Optional<DataType> inferArgumentType(CallContext callContext, int argumentPos, boolean throwOnFailure) {
+		final DataType actualDataType = callContext.getArgumentDataTypes().get(argumentPos);
+		final LogicalType actualType = actualDataType.getLogicalType();
+
+		if (Objects.equals(expectedNullability, Boolean.FALSE) && actualType.isNullable()) {
+			if (throwOnFailure) {
+				throw callContext.newValidationError(
+					"Unsupported argument type. Expected nullable type of family '%s' but actual type was '%s'.",
+					expectedFamily,
+					actualType);
+			}
+			return Optional.empty();
+		}
+
+		// type is part of the family
+		if (actualType.getTypeRoot().getFamilies().contains(expectedFamily)) {
+			return Optional.of(actualDataType);
+		}
+
+		// find a type for the family
+		final LogicalTypeRoot expectedRoot = familyToRoot.get(expectedFamily);
+		final Optional<DataType> inferredDataType = findDataType(
+			callContext,
+			false,
+			actualDataType,
+			expectedRoot,
+			expectedNullability);
+		if (!inferredDataType.isPresent() && throwOnFailure) {
+			throw callContext.newValidationError(
+					"Unsupported argument type. Expected type of family '%s' but actual type was '%s'.",
+					expectedFamily,
+					actualType);
+		}
+		return inferredDataType;
+	}
+
+	@Override
+	public Signature.Argument getExpectedArgument(FunctionDefinition functionDefinition, int argumentPos) {
+		// "< ... >" to indicate that this is not a type
+		if (Objects.equals(expectedNullability, Boolean.TRUE)) {
+			return Signature.Argument.of("<" + expectedFamily + " NULL>");
+		} else if (Objects.equals(expectedNullability, Boolean.FALSE)) {
+			return Signature.Argument.of("<" + expectedFamily + " NOT NULL>");
+		}
+		return Signature.Argument.of("<" + expectedFamily + ">");
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		FamilyArgumentTypeStrategy that = (FamilyArgumentTypeStrategy) o;
+		return expectedFamily == that.expectedFamily &&
+			Objects.equals(expectedNullability, that.expectedNullability);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(expectedFamily, expectedNullability);
+	}
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RootArgumentTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RootArgumentTypeStrategy.java
new file mode 100644
index 0000000..8983caa
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RootArgumentTypeStrategy.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.Signature.Argument;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+import java.util.Optional;
+
+import static org.apache.flink.table.types.inference.strategies.StrategyUtils.findDataType;
+
+/**
+ * Strategy for an argument that corresponds to a given {@link LogicalTypeRoot} and nullability.
+ *
+ * <p>Implicit casts will be inserted if possible.
+ */
+@Internal
+public final class RootArgumentTypeStrategy implements ArgumentTypeStrategy {
+
+	private final LogicalTypeRoot expectedRoot;
+
+	private final @Nullable Boolean expectedNullability;
+
+	public RootArgumentTypeStrategy(LogicalTypeRoot expectedRoot, @Nullable Boolean expectedNullability) {
+		this.expectedRoot = Preconditions.checkNotNull(expectedRoot);
+		this.expectedNullability = expectedNullability;
+	}
+
+	@Override
+	public Optional<DataType> inferArgumentType(CallContext callContext, int argumentPos, boolean throwOnFailure) {
+		final DataType actualDataType = callContext.getArgumentDataTypes().get(argumentPos);
+		final LogicalType actualType = actualDataType.getLogicalType();
+
+		if (Objects.equals(expectedNullability, Boolean.FALSE) && actualType.isNullable()) {
+			if (throwOnFailure) {
+				throw callContext.newValidationError(
+					"Unsupported argument type. Expected nullable type of root '%s' but actual type was '%s'.",
+					expectedRoot,
+					actualType);
+			}
+			return Optional.empty();
+		}
+
+		return findDataType(
+			callContext,
+			throwOnFailure,
+			actualDataType,
+			expectedRoot,
+			expectedNullability);
+	}
+
+	@Override
+	public Argument getExpectedArgument(FunctionDefinition functionDefinition, int argumentPos) {
+		// "< ... >" to indicate that this is not a type
+		if (Objects.equals(expectedNullability, Boolean.TRUE)) {
+			return Argument.of("<" + expectedRoot + " NULL>");
+		} else if (Objects.equals(expectedNullability, Boolean.FALSE)) {
+			return Argument.of("<" + expectedRoot + " NOT NULL>");
+		}
+		return Argument.of("<" + expectedRoot + ">");
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		RootArgumentTypeStrategy strategy = (RootArgumentTypeStrategy) o;
+		return expectedRoot == strategy.expectedRoot &&
+			Objects.equals(expectedNullability, strategy.expectedNullability);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(expectedRoot, expectedNullability);
+	}
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/StrategyUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/StrategyUtils.java
new file mode 100644
index 0000000..ed45a10
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/StrategyUtils.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeCasts;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+import java.util.Optional;
+
+import static org.apache.flink.table.types.logical.LogicalTypeFamily.APPROXIMATE_NUMERIC;
+import static org.apache.flink.table.types.logical.LogicalTypeFamily.EXACT_NUMERIC;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.BINARY;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.CHAR;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeCasts.supportsImplicitCast;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getLength;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasFamily;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot;
+
+/**
+ * Utilities for shared logic in classes of this package.
+ */
+final class StrategyUtils {
+
+	/**
+	 * Finds a data type that is close to the given data type in terms of nullability and conversion
+	 * class but of the given logical root.
+	 */
+	static Optional<DataType> findDataType(
+			CallContext callContext,
+			boolean throwOnFailure,
+			DataType actualDataType,
+			LogicalTypeRoot expectedRoot,
+			@Nullable Boolean expectedNullability) {
+		final LogicalType actualType = actualDataType.getLogicalType();
+		return Optional.ofNullable(findDataTypeOfRoot(actualDataType, expectedRoot))
+			// set nullability
+			.map(newDataType -> {
+				if (Objects.equals(expectedNullability, Boolean.TRUE)) {
+					return newDataType.nullable();
+				} else if (Objects.equals(expectedNullability, Boolean.FALSE)) {
+					return newDataType.notNull();
+				} else if (actualType.isNullable()) {
+					return newDataType.nullable();
+				}
+				return newDataType.notNull();
+			})
+			// preserve bridging class if possible
+			.map(newDataType -> {
+				final Class<?> clazz = actualDataType.getConversionClass();
+				final LogicalType newType = newDataType.getLogicalType();
+				if (newType.supportsOutputConversion(clazz)) {
+					return newDataType.bridgedTo(clazz);
+				}
+				return newDataType;
+			})
+			// check if type can be implicitly casted
+			.filter(newDataType -> {
+				if (supportsImplicitCast(actualType, newDataType.getLogicalType())) {
+					return true;
+				}
+				if (throwOnFailure) {
+					throw callContext.newValidationError(
+						"Unsupported argument type. Expected type root '%s' but actual type was '%s'.",
+						expectedRoot,
+						actualType);
+				}
+				return false;
+			});
+	}
+
+	/**
+	 * Returns a data type for the given data type and expected root.
+	 *
+	 * <p>This method is aligned with {@link LogicalTypeCasts#supportsImplicitCast(LogicalType, LogicalType)}.
+	 *
+	 * <p>The "fallback" data type for each root represents the default data type for a NULL literal. NULL
+	 * literals will receive the smallest precision possible for having little impact when finding a common
+	 * type. The output of this method needs to be checked again if an implicit cast is supported.
+	 */
+	private static @Nullable DataType findDataTypeOfRoot(
+			DataType actualDataType,
+			LogicalTypeRoot expectedRoot) {
+		final LogicalType actualType = actualDataType.getLogicalType();
+		if (hasRoot(actualType, expectedRoot)) {
+			return actualDataType;
+		}
+		switch (expectedRoot) {
+			case CHAR:
+				return DataTypes.CHAR(CharType.DEFAULT_LENGTH);
+			case VARCHAR:
+				if (hasRoot(actualType, CHAR)) {
+					return DataTypes.VARCHAR(getLength(actualType));
+				}
+				return DataTypes.VARCHAR(VarCharType.DEFAULT_LENGTH);
+			case BOOLEAN:
+				return DataTypes.BOOLEAN();
+			case BINARY:
+				return DataTypes.BINARY(BinaryType.DEFAULT_LENGTH);
+			case VARBINARY:
+				if (hasRoot(actualType, BINARY)) {
+					return DataTypes.VARBINARY(getLength(actualType));
+				}
+				return DataTypes.VARBINARY(VarBinaryType.DEFAULT_LENGTH);
+			case DECIMAL:
+				if (hasFamily(actualType, EXACT_NUMERIC)) {
+					return DataTypes.DECIMAL(getPrecision(actualType), getScale(actualType));
+				} else if (hasFamily(actualType, APPROXIMATE_NUMERIC)) {
+					final int precision = getPrecision(actualType);
+					// we don't know where the precision occurs (before or after the dot)
+					return DataTypes.DECIMAL(precision * 2, precision);
+				}
+				return DataTypes.DECIMAL(DecimalType.MIN_PRECISION, DecimalType.MIN_SCALE);
+			case TINYINT:
+				return DataTypes.TINYINT();
+			case SMALLINT:
+				return DataTypes.SMALLINT();
+			case INTEGER:
+				return DataTypes.INT();
+			case BIGINT:
+				return DataTypes.BIGINT();
+			case FLOAT:
+				return DataTypes.FLOAT();
+			case DOUBLE:
+				return DataTypes.DOUBLE();
+			case DATE:
+				return DataTypes.DATE();
+			case TIME_WITHOUT_TIME_ZONE:
+				if (hasRoot(actualType, TIMESTAMP_WITHOUT_TIME_ZONE)) {
+					return DataTypes.TIME(getPrecision(actualType));
+				}
+				return DataTypes.TIME();
+			case TIMESTAMP_WITHOUT_TIME_ZONE:
+				return DataTypes.TIMESTAMP();
+			case TIMESTAMP_WITH_TIME_ZONE:
+				return DataTypes.TIMESTAMP_WITH_TIME_ZONE();
+			case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+				return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE();
+			case INTERVAL_YEAR_MONTH:
+				return DataTypes.INTERVAL(DataTypes.MONTH());
+			case INTERVAL_DAY_TIME:
+				return DataTypes.INTERVAL(DataTypes.SECOND());
+			case NULL:
+				return DataTypes.NULL();
+			case ARRAY:
+			case MULTISET:
+			case MAP:
+			case ROW:
+			case DISTINCT_TYPE:
+			case STRUCTURED_TYPE:
+			case RAW:
+			case SYMBOL:
+			case UNRESOLVED:
+			default:
+				return null;
+		}
+	}
+
+	private StrategyUtils() {
+		// no instantiation
+	}
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DoubleType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DoubleType.java
index ea4f5b4..f72d724 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DoubleType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DoubleType.java
@@ -33,6 +33,8 @@ import java.util.Set;
 @PublicEvolving
 public final class DoubleType extends LogicalType {
 
+	public static final int PRECISION = 15; // adopted from Calcite
+
 	private static final String FORMAT = "DOUBLE";
 
 	private static final Set<String> NULL_OUTPUT_CONVERSION = conversionSet(
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/FloatType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/FloatType.java
index 7a52ac3a..155d3b4 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/FloatType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/FloatType.java
@@ -33,6 +33,8 @@ import java.util.Set;
 @PublicEvolving
 public final class FloatType extends LogicalType {
 
+	public static final int PRECISION = 7; // adopted from Calcite
+
 	private static final String FORMAT = "FLOAT";
 
 	private static final Set<String> NULL_OUTPUT_CONVERSION = conversionSet(
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java
index b6117f4..31ee3f8 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java
@@ -26,6 +26,8 @@ import org.apache.flink.table.types.logical.CharType;
 import org.apache.flink.table.types.logical.DayTimeIntervalType;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.LegacyTypeInformationType;
 import org.apache.flink.table.types.logical.LocalZonedTimestampType;
@@ -282,6 +284,16 @@ public final class LogicalTypeChecks {
 		}
 
 		@Override
+		public Integer visit(FloatType floatType) {
+			return FloatType.PRECISION;
+		}
+
+		@Override
+		public Integer visit(DoubleType doubleType) {
+			return DoubleType.PRECISION;
+		}
+
+		@Override
 		public Integer visit(TimeType timeType) {
 			return timeType.getPrecision();
 		}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java
index 431f72d..7bdc29c 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java
@@ -24,6 +24,8 @@ import org.apache.flink.table.functions.FunctionKind;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.inference.utils.CallContextMock;
 import org.apache.flink.table.types.inference.utils.FunctionDefinitionMock;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.utils.DataTypeFactoryMock;
 
 import org.junit.Rule;
@@ -52,6 +54,7 @@ import static org.apache.flink.table.types.inference.InputTypeStrategies.WILDCAR
 import static org.apache.flink.table.types.inference.InputTypeStrategies.and;
 import static org.apache.flink.table.types.inference.InputTypeStrategies.explicit;
 import static org.apache.flink.table.types.inference.InputTypeStrategies.explicitSequence;
+import static org.apache.flink.table.types.inference.InputTypeStrategies.logical;
 import static org.apache.flink.table.types.inference.InputTypeStrategies.or;
 import static org.apache.flink.table.types.inference.InputTypeStrategies.sequence;
 import static org.apache.flink.table.types.inference.InputTypeStrategies.varyingSequence;
@@ -481,7 +484,85 @@ public class InputTypeStrategiesTest {
 				"Map strategy fails for an odd number of arguments",
 				InputTypeStrategies.SPECIFIC_FOR_MAP)
 				.calledWithArgumentTypes(DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT())
-				.expectErrorMessage("Invalid number of arguments. 3 arguments passed.")
+				.expectErrorMessage("Invalid number of arguments. 3 arguments passed."),
+
+			TestSpec
+				.forStrategy(
+					"Logical type roots instead of concrete data types",
+					sequence(
+						logical(LogicalTypeRoot.VARCHAR),
+						logical(LogicalTypeRoot.DECIMAL, true),
+						logical(LogicalTypeRoot.DECIMAL),
+						logical(LogicalTypeRoot.BOOLEAN),
+						logical(LogicalTypeRoot.INTEGER, false),
+						logical(LogicalTypeRoot.INTEGER)))
+				.calledWithArgumentTypes(
+					DataTypes.NULL(),
+					DataTypes.INT(),
+					DataTypes.DOUBLE(),
+					DataTypes.BOOLEAN().notNull(),
+					DataTypes.INT().notNull(),
+					DataTypes.INT().notNull())
+				.expectSignature(
+					"f(<VARCHAR>, <DECIMAL NULL>, <DECIMAL>, <BOOLEAN>, <INTEGER NOT NULL>, <INTEGER>)")
+				.expectArgumentTypes(
+					DataTypes.VARCHAR(1),
+					DataTypes.DECIMAL(10, 0),
+					DataTypes.DECIMAL(30, 15),
+					DataTypes.BOOLEAN().notNull(),
+					DataTypes.INT().notNull(),
+					DataTypes.INT().notNull()),
+
+			TestSpec
+				.forStrategy(
+					"Logical type roots with wrong implicit cast",
+					sequence(logical(LogicalTypeRoot.VARCHAR)))
+				.calledWithArgumentTypes(DataTypes.INT())
+				.expectSignature("f(<VARCHAR>)")
+				.expectErrorMessage(
+					"Unsupported argument type. Expected type root 'VARCHAR' but actual type was 'INT'."),
+
+			TestSpec
+				.forStrategy(
+					"Logical type roots with wrong nullability",
+					sequence(logical(LogicalTypeRoot.VARCHAR, false)))
+				.calledWithArgumentTypes(DataTypes.VARCHAR(5))
+				.expectSignature("f(<VARCHAR NOT NULL>)")
+				.expectErrorMessage(
+					"Unsupported argument type. Expected nullable type of root 'VARCHAR' but actual type was 'VARCHAR(5)'."),
+
+			TestSpec
+				.forStrategy(
+					"Logical type family instead of concrete data types",
+					sequence(
+						logical(LogicalTypeFamily.CHARACTER_STRING, true),
+						logical(LogicalTypeFamily.EXACT_NUMERIC),
+						logical(LogicalTypeFamily.APPROXIMATE_NUMERIC),
+						logical(LogicalTypeFamily.APPROXIMATE_NUMERIC),
+						logical(LogicalTypeFamily.APPROXIMATE_NUMERIC, false)))
+				.calledWithArgumentTypes(
+					DataTypes.NULL(),
+					DataTypes.TINYINT(),
+					DataTypes.INT(),
+					DataTypes.BIGINT().notNull(),
+					DataTypes.DECIMAL(10, 2).notNull())
+				.expectSignature(
+					"f(<CHARACTER_STRING NULL>, <EXACT_NUMERIC>, <APPROXIMATE_NUMERIC>, <APPROXIMATE_NUMERIC>, <APPROXIMATE_NUMERIC NOT NULL>)")
+				.expectArgumentTypes(
+					DataTypes.VARCHAR(1),
+					DataTypes.TINYINT(),
+					DataTypes.DOUBLE(), // widening with preserved nullability
+					DataTypes.DOUBLE().notNull(), // widening with preserved nullability
+					DataTypes.DOUBLE().notNull()),
+
+			TestSpec
+				.forStrategy(
+					"Logical type family with invalid type",
+					sequence(logical(LogicalTypeFamily.EXACT_NUMERIC)))
+				.calledWithArgumentTypes(DataTypes.FLOAT())
+				.expectSignature("f(<EXACT_NUMERIC>)")
+				.expectErrorMessage(
+					"Unsupported argument type. Expected type of family 'EXACT_NUMERIC' but actual type was 'FLOAT'.")
 		);
 	}