You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2020/05/26 14:10:07 UTC
[flink] 01/02: [hotfix][table-common] Add logical type root/family
argument strategies
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit cea14b18c0b9451f5e887a105c13908e839681b3
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon May 25 13:04:51 2020 +0200
[hotfix][table-common] Add logical type root/family argument strategies
---
.../table/types/inference/InputTypeStrategies.java | 36 ++++
.../strategies/FamilyArgumentTypeStrategy.java | 138 +++++++++++++++
.../strategies/RootArgumentTypeStrategy.java | 105 +++++++++++
.../types/inference/strategies/StrategyUtils.java | 192 +++++++++++++++++++++
.../flink/table/types/logical/DoubleType.java | 2 +
.../flink/table/types/logical/FloatType.java | 2 +
.../types/logical/utils/LogicalTypeChecks.java | 12 ++
.../types/inference/InputTypeStrategiesTest.java | 83 ++++++++-
8 files changed, 569 insertions(+), 1 deletion(-)
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java
index 770199c..7476aac 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java
@@ -24,14 +24,18 @@ import org.apache.flink.table.types.inference.strategies.AndArgumentTypeStrategy
import org.apache.flink.table.types.inference.strategies.AnyArgumentTypeStrategy;
import org.apache.flink.table.types.inference.strategies.ArrayInputTypeStrategy;
import org.apache.flink.table.types.inference.strategies.ExplicitArgumentTypeStrategy;
+import org.apache.flink.table.types.inference.strategies.FamilyArgumentTypeStrategy;
import org.apache.flink.table.types.inference.strategies.LiteralArgumentTypeStrategy;
import org.apache.flink.table.types.inference.strategies.MapInputTypeStrategy;
import org.apache.flink.table.types.inference.strategies.OrArgumentTypeStrategy;
import org.apache.flink.table.types.inference.strategies.OrInputTypeStrategy;
import org.apache.flink.table.types.inference.strategies.OutputArgumentTypeStrategy;
+import org.apache.flink.table.types.inference.strategies.RootArgumentTypeStrategy;
import org.apache.flink.table.types.inference.strategies.SequenceInputTypeStrategy;
import org.apache.flink.table.types.inference.strategies.VaryingSequenceInputTypeStrategy;
import org.apache.flink.table.types.inference.strategies.WildcardInputTypeStrategy;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
import java.util.Arrays;
import java.util.List;
@@ -170,6 +174,38 @@ public final class InputTypeStrategies {
}
/**
+ * Strategy for an argument that corresponds to a given {@link LogicalTypeRoot}.
+ * Implicit casts will be inserted if possible.
+ */
+ public static RootArgumentTypeStrategy logical(LogicalTypeRoot expectedRoot) {
+ return new RootArgumentTypeStrategy(expectedRoot, null);
+ }
+
+ /**
+ * Strategy for an argument that corresponds to a given {@link LogicalTypeRoot} and nullability.
+ * Implicit casts will be inserted if possible.
+ */
+ public static RootArgumentTypeStrategy logical(LogicalTypeRoot expectedRoot, boolean expectedNullability) {
+ return new RootArgumentTypeStrategy(expectedRoot, expectedNullability);
+ }
+
+ /**
+ * Strategy for an argument that corresponds to a given {@link LogicalTypeFamily}.
+ * Implicit casts will be inserted if possible.
+ */
+ public static FamilyArgumentTypeStrategy logical(LogicalTypeFamily expectedFamily) {
+ return new FamilyArgumentTypeStrategy(expectedFamily, null);
+ }
+
+ /**
+ * Strategy for an argument that corresponds to a given {@link LogicalTypeFamily} and nullability.
+ * Implicit casts will be inserted if possible.
+ */
+ public static FamilyArgumentTypeStrategy logical(LogicalTypeFamily expectedFamily, boolean expectedNullability) {
+ return new FamilyArgumentTypeStrategy(expectedFamily, expectedNullability);
+ }
+
+ /**
* Strategy for a conjunction of multiple {@link ArgumentTypeStrategy}s into one like
* {@code f(NUMERIC && LITERAL)}.
*
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FamilyArgumentTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FamilyArgumentTypeStrategy.java
new file mode 100644
index 0000000..b5f154f
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FamilyArgumentTypeStrategy.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+import static org.apache.flink.table.types.inference.strategies.StrategyUtils.findDataType;
+
+/**
+ * Strategy for an argument that corresponds to a given {@link LogicalTypeFamily} and nullability.
+ *
+ * <p>Implicit casts will be inserted if possible.
+ */
+@Internal
+public final class FamilyArgumentTypeStrategy implements ArgumentTypeStrategy {
+
+ private final LogicalTypeFamily expectedFamily;
+
+ private final @Nullable Boolean expectedNullability;
+
+ private static final Map<LogicalTypeFamily, LogicalTypeRoot> familyToRoot = new HashMap<>();
+ static {
+ // "fallback" root for a NULL literals,
+ // they receive the smallest precision possible for having little impact when finding a common type.
+ familyToRoot.put(LogicalTypeFamily.NUMERIC, LogicalTypeRoot.TINYINT);
+ familyToRoot.put(LogicalTypeFamily.EXACT_NUMERIC, LogicalTypeRoot.TINYINT);
+ familyToRoot.put(LogicalTypeFamily.CHARACTER_STRING, LogicalTypeRoot.VARCHAR);
+ familyToRoot.put(LogicalTypeFamily.BINARY_STRING, LogicalTypeRoot.VARBINARY);
+ familyToRoot.put(LogicalTypeFamily.APPROXIMATE_NUMERIC, LogicalTypeRoot.DOUBLE);
+ familyToRoot.put(LogicalTypeFamily.TIMESTAMP, LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);
+ familyToRoot.put(LogicalTypeFamily.TIME, LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE);
+ }
+
+ public FamilyArgumentTypeStrategy(LogicalTypeFamily expectedFamily, @Nullable Boolean expectedNullability) {
+ Preconditions.checkArgument(
+ familyToRoot.containsKey(expectedFamily),
+ "Unsupported family for argument type strategy.");
+ this.expectedFamily = expectedFamily;
+ this.expectedNullability = expectedNullability;
+ }
+
+ @Override
+ public Optional<DataType> inferArgumentType(CallContext callContext, int argumentPos, boolean throwOnFailure) {
+ final DataType actualDataType = callContext.getArgumentDataTypes().get(argumentPos);
+ final LogicalType actualType = actualDataType.getLogicalType();
+
+ if (Objects.equals(expectedNullability, Boolean.FALSE) && actualType.isNullable()) {
+ if (throwOnFailure) {
+ throw callContext.newValidationError(
+ "Unsupported argument type. Expected nullable type of family '%s' but actual type was '%s'.",
+ expectedFamily,
+ actualType);
+ }
+ return Optional.empty();
+ }
+
+ // type is part of the family
+ if (actualType.getTypeRoot().getFamilies().contains(expectedFamily)) {
+ return Optional.of(actualDataType);
+ }
+
+ // find a type for the family
+ final LogicalTypeRoot expectedRoot = familyToRoot.get(expectedFamily);
+ final Optional<DataType> inferredDataType = findDataType(
+ callContext,
+ false,
+ actualDataType,
+ expectedRoot,
+ expectedNullability);
+ if (!inferredDataType.isPresent() && throwOnFailure) {
+ throw callContext.newValidationError(
+ "Unsupported argument type. Expected type of family '%s' but actual type was '%s'.",
+ expectedFamily,
+ actualType);
+ }
+ return inferredDataType;
+ }
+
+ @Override
+ public Signature.Argument getExpectedArgument(FunctionDefinition functionDefinition, int argumentPos) {
+ // "< ... >" to indicate that this is not a type
+ if (Objects.equals(expectedNullability, Boolean.TRUE)) {
+ return Signature.Argument.of("<" + expectedFamily + " NULL>");
+ } else if (Objects.equals(expectedNullability, Boolean.FALSE)) {
+ return Signature.Argument.of("<" + expectedFamily + " NOT NULL>");
+ }
+ return Signature.Argument.of("<" + expectedFamily + ">");
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ FamilyArgumentTypeStrategy that = (FamilyArgumentTypeStrategy) o;
+ return expectedFamily == that.expectedFamily &&
+ Objects.equals(expectedNullability, that.expectedNullability);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(expectedFamily, expectedNullability);
+ }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RootArgumentTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RootArgumentTypeStrategy.java
new file mode 100644
index 0000000..8983caa
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RootArgumentTypeStrategy.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.Signature.Argument;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+import java.util.Optional;
+
+import static org.apache.flink.table.types.inference.strategies.StrategyUtils.findDataType;
+
+/**
+ * Strategy for an argument that corresponds to a given {@link LogicalTypeRoot} and nullability.
+ *
+ * <p>Implicit casts will be inserted if possible.
+ */
+@Internal
+public final class RootArgumentTypeStrategy implements ArgumentTypeStrategy {
+
+ private final LogicalTypeRoot expectedRoot;
+
+ private final @Nullable Boolean expectedNullability;
+
+ public RootArgumentTypeStrategy(LogicalTypeRoot expectedRoot, @Nullable Boolean expectedNullability) {
+ this.expectedRoot = Preconditions.checkNotNull(expectedRoot);
+ this.expectedNullability = expectedNullability;
+ }
+
+ @Override
+ public Optional<DataType> inferArgumentType(CallContext callContext, int argumentPos, boolean throwOnFailure) {
+ final DataType actualDataType = callContext.getArgumentDataTypes().get(argumentPos);
+ final LogicalType actualType = actualDataType.getLogicalType();
+
+ if (Objects.equals(expectedNullability, Boolean.FALSE) && actualType.isNullable()) {
+ if (throwOnFailure) {
+ throw callContext.newValidationError(
+ "Unsupported argument type. Expected nullable type of root '%s' but actual type was '%s'.",
+ expectedRoot,
+ actualType);
+ }
+ return Optional.empty();
+ }
+
+ return findDataType(
+ callContext,
+ throwOnFailure,
+ actualDataType,
+ expectedRoot,
+ expectedNullability);
+ }
+
+ @Override
+ public Argument getExpectedArgument(FunctionDefinition functionDefinition, int argumentPos) {
+ // "< ... >" to indicate that this is not a type
+ if (Objects.equals(expectedNullability, Boolean.TRUE)) {
+ return Argument.of("<" + expectedRoot + " NULL>");
+ } else if (Objects.equals(expectedNullability, Boolean.FALSE)) {
+ return Argument.of("<" + expectedRoot + " NOT NULL>");
+ }
+ return Argument.of("<" + expectedRoot + ">");
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ RootArgumentTypeStrategy strategy = (RootArgumentTypeStrategy) o;
+ return expectedRoot == strategy.expectedRoot &&
+ Objects.equals(expectedNullability, strategy.expectedNullability);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(expectedRoot, expectedNullability);
+ }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/StrategyUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/StrategyUtils.java
new file mode 100644
index 0000000..ed45a10
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/StrategyUtils.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeCasts;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+import java.util.Optional;
+
+import static org.apache.flink.table.types.logical.LogicalTypeFamily.APPROXIMATE_NUMERIC;
+import static org.apache.flink.table.types.logical.LogicalTypeFamily.EXACT_NUMERIC;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.BINARY;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.CHAR;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeCasts.supportsImplicitCast;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getLength;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasFamily;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot;
+
+/**
+ * Utilities for shared logic in classes of this package.
+ */
+final class StrategyUtils {
+
+ /**
+ * Finds a data type that is close to the given data type in terms of nullability and conversion
+ * class but of the given logical root.
+ */
+ static Optional<DataType> findDataType(
+ CallContext callContext,
+ boolean throwOnFailure,
+ DataType actualDataType,
+ LogicalTypeRoot expectedRoot,
+ @Nullable Boolean expectedNullability) {
+ final LogicalType actualType = actualDataType.getLogicalType();
+ return Optional.ofNullable(findDataTypeOfRoot(actualDataType, expectedRoot))
+ // set nullability
+ .map(newDataType -> {
+ if (Objects.equals(expectedNullability, Boolean.TRUE)) {
+ return newDataType.nullable();
+ } else if (Objects.equals(expectedNullability, Boolean.FALSE)) {
+ return newDataType.notNull();
+ } else if (actualType.isNullable()) {
+ return newDataType.nullable();
+ }
+ return newDataType.notNull();
+ })
+ // preserve bridging class if possible
+ .map(newDataType -> {
+ final Class<?> clazz = actualDataType.getConversionClass();
+ final LogicalType newType = newDataType.getLogicalType();
+ if (newType.supportsOutputConversion(clazz)) {
+ return newDataType.bridgedTo(clazz);
+ }
+ return newDataType;
+ })
+ // check if type can be implicitly casted
+ .filter(newDataType -> {
+ if (supportsImplicitCast(actualType, newDataType.getLogicalType())) {
+ return true;
+ }
+ if (throwOnFailure) {
+ throw callContext.newValidationError(
+ "Unsupported argument type. Expected type root '%s' but actual type was '%s'.",
+ expectedRoot,
+ actualType);
+ }
+ return false;
+ });
+ }
+
+ /**
+ * Returns a data type for the given data type and expected root.
+ *
+ * <p>This method is aligned with {@link LogicalTypeCasts#supportsImplicitCast(LogicalType, LogicalType)}.
+ *
+ * <p>The "fallback" data type for each root represents the default data type for a NULL literal. NULL
+ * literals will receive the smallest precision possible for having little impact when finding a common
+ * type. The output of this method needs to be checked again if an implicit cast is supported.
+ */
+ private static @Nullable DataType findDataTypeOfRoot(
+ DataType actualDataType,
+ LogicalTypeRoot expectedRoot) {
+ final LogicalType actualType = actualDataType.getLogicalType();
+ if (hasRoot(actualType, expectedRoot)) {
+ return actualDataType;
+ }
+ switch (expectedRoot) {
+ case CHAR:
+ return DataTypes.CHAR(CharType.DEFAULT_LENGTH);
+ case VARCHAR:
+ if (hasRoot(actualType, CHAR)) {
+ return DataTypes.VARCHAR(getLength(actualType));
+ }
+ return DataTypes.VARCHAR(VarCharType.DEFAULT_LENGTH);
+ case BOOLEAN:
+ return DataTypes.BOOLEAN();
+ case BINARY:
+ return DataTypes.BINARY(BinaryType.DEFAULT_LENGTH);
+ case VARBINARY:
+ if (hasRoot(actualType, BINARY)) {
+ return DataTypes.VARBINARY(getLength(actualType));
+ }
+ return DataTypes.VARBINARY(VarBinaryType.DEFAULT_LENGTH);
+ case DECIMAL:
+ if (hasFamily(actualType, EXACT_NUMERIC)) {
+ return DataTypes.DECIMAL(getPrecision(actualType), getScale(actualType));
+ } else if (hasFamily(actualType, APPROXIMATE_NUMERIC)) {
+ final int precision = getPrecision(actualType);
+ // we don't know where the precision occurs (before or after the dot)
+ return DataTypes.DECIMAL(precision * 2, precision);
+ }
+ return DataTypes.DECIMAL(DecimalType.MIN_PRECISION, DecimalType.MIN_SCALE);
+ case TINYINT:
+ return DataTypes.TINYINT();
+ case SMALLINT:
+ return DataTypes.SMALLINT();
+ case INTEGER:
+ return DataTypes.INT();
+ case BIGINT:
+ return DataTypes.BIGINT();
+ case FLOAT:
+ return DataTypes.FLOAT();
+ case DOUBLE:
+ return DataTypes.DOUBLE();
+ case DATE:
+ return DataTypes.DATE();
+ case TIME_WITHOUT_TIME_ZONE:
+ if (hasRoot(actualType, TIMESTAMP_WITHOUT_TIME_ZONE)) {
+ return DataTypes.TIME(getPrecision(actualType));
+ }
+ return DataTypes.TIME();
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return DataTypes.TIMESTAMP();
+ case TIMESTAMP_WITH_TIME_ZONE:
+ return DataTypes.TIMESTAMP_WITH_TIME_ZONE();
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE();
+ case INTERVAL_YEAR_MONTH:
+ return DataTypes.INTERVAL(DataTypes.MONTH());
+ case INTERVAL_DAY_TIME:
+ return DataTypes.INTERVAL(DataTypes.SECOND());
+ case NULL:
+ return DataTypes.NULL();
+ case ARRAY:
+ case MULTISET:
+ case MAP:
+ case ROW:
+ case DISTINCT_TYPE:
+ case STRUCTURED_TYPE:
+ case RAW:
+ case SYMBOL:
+ case UNRESOLVED:
+ default:
+ return null;
+ }
+ }
+
+ private StrategyUtils() {
+ // no instantiation
+ }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DoubleType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DoubleType.java
index ea4f5b4..f72d724 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DoubleType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DoubleType.java
@@ -33,6 +33,8 @@ import java.util.Set;
@PublicEvolving
public final class DoubleType extends LogicalType {
+ public static final int PRECISION = 15; // adopted from Calcite
+
private static final String FORMAT = "DOUBLE";
private static final Set<String> NULL_OUTPUT_CONVERSION = conversionSet(
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/FloatType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/FloatType.java
index 7a52ac3a..155d3b4 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/FloatType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/FloatType.java
@@ -33,6 +33,8 @@ import java.util.Set;
@PublicEvolving
public final class FloatType extends LogicalType {
+ public static final int PRECISION = 7; // adopted from Calcite
+
private static final String FORMAT = "FLOAT";
private static final Set<String> NULL_OUTPUT_CONVERSION = conversionSet(
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java
index b6117f4..31ee3f8 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java
@@ -26,6 +26,8 @@ import org.apache.flink.table.types.logical.CharType;
import org.apache.flink.table.types.logical.DayTimeIntervalType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LegacyTypeInformationType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
@@ -282,6 +284,16 @@ public final class LogicalTypeChecks {
}
@Override
+ public Integer visit(FloatType floatType) {
+ return FloatType.PRECISION;
+ }
+
+ @Override
+ public Integer visit(DoubleType doubleType) {
+ return DoubleType.PRECISION;
+ }
+
+ @Override
public Integer visit(TimeType timeType) {
return timeType.getPrecision();
}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java
index 431f72d..7bdc29c 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java
@@ -24,6 +24,8 @@ import org.apache.flink.table.functions.FunctionKind;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.utils.CallContextMock;
import org.apache.flink.table.types.inference.utils.FunctionDefinitionMock;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.utils.DataTypeFactoryMock;
import org.junit.Rule;
@@ -52,6 +54,7 @@ import static org.apache.flink.table.types.inference.InputTypeStrategies.WILDCAR
import static org.apache.flink.table.types.inference.InputTypeStrategies.and;
import static org.apache.flink.table.types.inference.InputTypeStrategies.explicit;
import static org.apache.flink.table.types.inference.InputTypeStrategies.explicitSequence;
+import static org.apache.flink.table.types.inference.InputTypeStrategies.logical;
import static org.apache.flink.table.types.inference.InputTypeStrategies.or;
import static org.apache.flink.table.types.inference.InputTypeStrategies.sequence;
import static org.apache.flink.table.types.inference.InputTypeStrategies.varyingSequence;
@@ -481,7 +484,85 @@ public class InputTypeStrategiesTest {
"Map strategy fails for an odd number of arguments",
InputTypeStrategies.SPECIFIC_FOR_MAP)
.calledWithArgumentTypes(DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT())
- .expectErrorMessage("Invalid number of arguments. 3 arguments passed.")
+ .expectErrorMessage("Invalid number of arguments. 3 arguments passed."),
+
+ TestSpec
+ .forStrategy(
+ "Logical type roots instead of concrete data types",
+ sequence(
+ logical(LogicalTypeRoot.VARCHAR),
+ logical(LogicalTypeRoot.DECIMAL, true),
+ logical(LogicalTypeRoot.DECIMAL),
+ logical(LogicalTypeRoot.BOOLEAN),
+ logical(LogicalTypeRoot.INTEGER, false),
+ logical(LogicalTypeRoot.INTEGER)))
+ .calledWithArgumentTypes(
+ DataTypes.NULL(),
+ DataTypes.INT(),
+ DataTypes.DOUBLE(),
+ DataTypes.BOOLEAN().notNull(),
+ DataTypes.INT().notNull(),
+ DataTypes.INT().notNull())
+ .expectSignature(
+ "f(<VARCHAR>, <DECIMAL NULL>, <DECIMAL>, <BOOLEAN>, <INTEGER NOT NULL>, <INTEGER>)")
+ .expectArgumentTypes(
+ DataTypes.VARCHAR(1),
+ DataTypes.DECIMAL(10, 0),
+ DataTypes.DECIMAL(30, 15),
+ DataTypes.BOOLEAN().notNull(),
+ DataTypes.INT().notNull(),
+ DataTypes.INT().notNull()),
+
+ TestSpec
+ .forStrategy(
+ "Logical type roots with wrong implicit cast",
+ sequence(logical(LogicalTypeRoot.VARCHAR)))
+ .calledWithArgumentTypes(DataTypes.INT())
+ .expectSignature("f(<VARCHAR>)")
+ .expectErrorMessage(
+ "Unsupported argument type. Expected type root 'VARCHAR' but actual type was 'INT'."),
+
+ TestSpec
+ .forStrategy(
+ "Logical type roots with wrong nullability",
+ sequence(logical(LogicalTypeRoot.VARCHAR, false)))
+ .calledWithArgumentTypes(DataTypes.VARCHAR(5))
+ .expectSignature("f(<VARCHAR NOT NULL>)")
+ .expectErrorMessage(
+ "Unsupported argument type. Expected nullable type of root 'VARCHAR' but actual type was 'VARCHAR(5)'."),
+
+ TestSpec
+ .forStrategy(
+ "Logical type family instead of concrete data types",
+ sequence(
+ logical(LogicalTypeFamily.CHARACTER_STRING, true),
+ logical(LogicalTypeFamily.EXACT_NUMERIC),
+ logical(LogicalTypeFamily.APPROXIMATE_NUMERIC),
+ logical(LogicalTypeFamily.APPROXIMATE_NUMERIC),
+ logical(LogicalTypeFamily.APPROXIMATE_NUMERIC, false)))
+ .calledWithArgumentTypes(
+ DataTypes.NULL(),
+ DataTypes.TINYINT(),
+ DataTypes.INT(),
+ DataTypes.BIGINT().notNull(),
+ DataTypes.DECIMAL(10, 2).notNull())
+ .expectSignature(
+ "f(<CHARACTER_STRING NULL>, <EXACT_NUMERIC>, <APPROXIMATE_NUMERIC>, <APPROXIMATE_NUMERIC>, <APPROXIMATE_NUMERIC NOT NULL>)")
+ .expectArgumentTypes(
+ DataTypes.VARCHAR(1),
+ DataTypes.TINYINT(),
+ DataTypes.DOUBLE(), // widening with preserved nullability
+ DataTypes.DOUBLE().notNull(), // widening with preserved nullability
+ DataTypes.DOUBLE().notNull()),
+
+ TestSpec
+ .forStrategy(
+ "Logical type family with invalid type",
+ sequence(logical(LogicalTypeFamily.EXACT_NUMERIC)))
+ .calledWithArgumentTypes(DataTypes.FLOAT())
+ .expectSignature("f(<EXACT_NUMERIC>)")
+ .expectErrorMessage(
+ "Unsupported argument type. Expected type of family 'EXACT_NUMERIC' but actual type was 'FLOAT'.")
);
}