You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/07/08 12:28:34 UTC
[flink] branch master updated: [FLINK-12968][table-common] Add an
utility for finding a common type from a set of types
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 3bcd47f [FLINK-12968][table-common] Add an utility for finding a common type from a set of types
3bcd47f is described below
commit 3bcd47fac93c14b23bfc8d15f1a6dccd607631fb
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon Jul 1 12:11:19 2019 +0200
[FLINK-12968][table-common] Add an utility for finding a common type from a set of types
This closes #8933.
---
.../table/types/logical/DayTimeIntervalType.java | 2 +
.../flink/table/types/logical/LogicalType.java | 5 +
.../table/types/logical/YearMonthIntervalType.java | 6 +-
.../logical/utils/LogicalTypeGeneralization.java | 658 +++++++++++++++++++++
.../table/types/LogicalTypeGeneralizationTest.java | 325 ++++++++++
5 files changed, 994 insertions(+), 2 deletions(-)
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DayTimeIntervalType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DayTimeIntervalType.java
index a9eb21c..85b700b 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DayTimeIntervalType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DayTimeIntervalType.java
@@ -101,6 +101,8 @@ public final class DayTimeIntervalType extends LogicalType {
/**
* Supported resolutions of this type.
+ *
+ * <p>Note: The order of this enum reflects the granularity from coarse to fine.
*/
public enum DayTimeResolution {
DAY,
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java
index 8b03dcf..4e4942a 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java
@@ -19,6 +19,8 @@
package org.apache.flink.table.types.logical;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.types.logical.utils.LogicalTypeCasts;
+import org.apache.flink.table.types.logical.utils.LogicalTypeGeneralization;
import org.apache.flink.util.Preconditions;
import java.io.Serializable;
@@ -43,6 +45,9 @@ import java.util.Set;
* <p>Instances of this class describe the fully parameterized, immutable type with additional
* information such as numeric precision or expected length.
*
+ * <p>Contracts how logical types relate to other types are defined by {@link LogicalTypeCasts} and
+ * {@link LogicalTypeGeneralization}.
+ *
* <p>NOTE: A logical type is just a description of a type, a planner or runtime might not support
* every type in every logical precision yet!
*/
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/YearMonthIntervalType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/YearMonthIntervalType.java
index 8891398..7c2622d 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/YearMonthIntervalType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/YearMonthIntervalType.java
@@ -74,11 +74,13 @@ public final class YearMonthIntervalType extends LogicalType {
/**
* Supported resolutions of this type.
+ *
+ * <p>Note: The order of this enum reflects the granularity from coarse to fine.
*/
public enum YearMonthResolution {
YEAR,
- MONTH,
- YEAR_TO_MONTH
+ YEAR_TO_MONTH,
+ MONTH
}
private final YearMonthResolution resolution;
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeGeneralization.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeGeneralization.java
new file mode 100644
index 0000000..18ffeb3
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeGeneralization.java
@@ -0,0 +1,658 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType.YearMonthResolution;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution.DAY;
+import static org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution.DAY_TO_HOUR;
+import static org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution.DAY_TO_MINUTE;
+import static org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution.DAY_TO_SECOND;
+import static org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution.HOUR;
+import static org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution.HOUR_TO_MINUTE;
+import static org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution.HOUR_TO_SECOND;
+import static org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution.MINUTE;
+import static org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution.MINUTE_TO_SECOND;
+import static org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution.SECOND;
+import static org.apache.flink.table.types.logical.LogicalTypeFamily.APPROXIMATE_NUMERIC;
+import static org.apache.flink.table.types.logical.LogicalTypeFamily.BINARY_STRING;
+import static org.apache.flink.table.types.logical.LogicalTypeFamily.CHARACTER_STRING;
+import static org.apache.flink.table.types.logical.LogicalTypeFamily.DATETIME;
+import static org.apache.flink.table.types.logical.LogicalTypeFamily.EXACT_NUMERIC;
+import static org.apache.flink.table.types.logical.LogicalTypeFamily.INTERVAL;
+import static org.apache.flink.table.types.logical.LogicalTypeFamily.NUMERIC;
+import static org.apache.flink.table.types.logical.LogicalTypeFamily.TIME;
+import static org.apache.flink.table.types.logical.LogicalTypeFamily.TIMESTAMP;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.ANY;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.ARRAY;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.DATE;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.DECIMAL;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.DOUBLE;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.INTERVAL_DAY_TIME;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.INTERVAL_YEAR_MONTH;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.MAP;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.MULTISET;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.NULL;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.ROW;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARBINARY;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARCHAR;
+import static org.apache.flink.table.types.logical.YearMonthIntervalType.YearMonthResolution.MONTH;
+import static org.apache.flink.table.types.logical.YearMonthIntervalType.YearMonthResolution.YEAR;
+import static org.apache.flink.table.types.logical.YearMonthIntervalType.YearMonthResolution.YEAR_TO_MONTH;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeCasts.supportsImplicitCast;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getLength;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasFamily;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot;
+
+/**
+ * Utilities for finding a common, more general {@link LogicalType} for a given set of types. If such
+ * a type exists, all given types can be casted to this more general type.
+ *
+ * <p>This class aims to be compatible with the SQL standard. It is inspired by Apache Calcite's
+ * {@code SqlTypeFactoryImpl#leastRestrictive} method.
+ */
+@Internal
+public final class LogicalTypeGeneralization {
+
+ // mappings for interval generalization
+ private static final Map<YearMonthResolution, List<YearMonthResolution>> YEAR_MONTH_RES_TO_BOUNDARIES = new HashMap<>();
+ private static final Map<List<YearMonthResolution>, YearMonthResolution> YEAR_MONTH_BOUNDARIES_TO_RES = new HashMap<>();
+ static {
+ addYearMonthMapping(YEAR, YEAR);
+ addYearMonthMapping(MONTH, MONTH);
+ addYearMonthMapping(YEAR_TO_MONTH, YEAR, MONTH);
+ }
+
+ private static final Map<DayTimeResolution, List<DayTimeResolution>> DAY_TIME_RES_TO_BOUNDARIES = new HashMap<>();
+ private static final Map<List<DayTimeResolution>, DayTimeResolution> DAY_TIME_BOUNDARIES_TO_RES = new HashMap<>();
+ static {
+ addDayTimeMapping(DAY, DAY);
+ addDayTimeMapping(DAY_TO_HOUR, DAY, HOUR);
+ addDayTimeMapping(DAY_TO_MINUTE, DAY, MINUTE);
+ addDayTimeMapping(DAY_TO_SECOND, DAY, SECOND);
+ addDayTimeMapping(HOUR, HOUR);
+ addDayTimeMapping(HOUR_TO_MINUTE, HOUR, MINUTE);
+ addDayTimeMapping(HOUR_TO_SECOND, HOUR, SECOND);
+ addDayTimeMapping(MINUTE, MINUTE);
+ addDayTimeMapping(MINUTE_TO_SECOND, MINUTE, SECOND);
+ addDayTimeMapping(SECOND, SECOND);
+ }
+
+ private static void addYearMonthMapping(YearMonthResolution to, YearMonthResolution... boundaries) {
+ final List<YearMonthResolution> boundariesList = Arrays.asList(boundaries);
+ YEAR_MONTH_RES_TO_BOUNDARIES.put(to, boundariesList);
+ YEAR_MONTH_BOUNDARIES_TO_RES.put(boundariesList, to);
+ }
+
+ private static void addDayTimeMapping(DayTimeResolution to, DayTimeResolution... boundaries) {
+ final List<DayTimeResolution> boundariesList = Arrays.asList(boundaries);
+ DAY_TIME_RES_TO_BOUNDARIES.put(to, boundariesList);
+ DAY_TIME_BOUNDARIES_TO_RES.put(boundariesList, to);
+ }
+
+ /**
+ * Returns the most common type of a set of types. It determines a type to which all given types
+ * can be casted.
+ *
+ * <p>For example: {@code [INT, BIGINT, DECIMAL(2, 2)]} would lead to {@code DECIMAL(21, 2)}.
+ */
+ public static Optional<LogicalType> findCommonType(List<LogicalType> types) {
+ Preconditions.checkArgument(types.size() > 0, "List of types must not be empty.");
+
+ // collect statistics first
+ boolean hasAnyType = false;
+ boolean hasNullType = false;
+ boolean hasNullableTypes = false;
+ for (LogicalType type : types) {
+ final LogicalTypeRoot typeRoot = type.getTypeRoot();
+ if (typeRoot == ANY) {
+ hasAnyType = true;
+ } else if (typeRoot == NULL) {
+ hasNullType = true;
+ }
+ if (type.isNullable()) {
+ hasNullableTypes = true;
+ }
+ }
+
+ final List<LogicalType> normalizedTypes = types.stream()
+ .map(t -> t.copy(true))
+ .collect(Collectors.toList());
+
+ LogicalType foundType = findCommonNullableType(normalizedTypes, hasAnyType, hasNullType);
+ if (foundType == null) {
+ foundType = findCommonCastableType(normalizedTypes);
+ }
+
+ if (foundType != null) {
+ final LogicalType typeWithNullability = foundType.copy(hasNullableTypes);
+ return Optional.of(typeWithNullability);
+ }
+ return Optional.empty();
+ }
+
+ private static @Nullable LogicalType findCommonCastableType(List<LogicalType> normalizedTypes) {
+ LogicalType resultType = normalizedTypes.get(0);
+
+ for (LogicalType type : normalizedTypes) {
+ final LogicalTypeRoot typeRoot = type.getTypeRoot();
+
+ // NULL does not affect the result of this loop
+ if (typeRoot == NULL) {
+ continue;
+ }
+
+ if (supportsImplicitCast(resultType, type)) {
+ resultType = type;
+ } else {
+ if (!supportsImplicitCast(type, resultType)) {
+ return null;
+ }
+ }
+ }
+
+ return resultType;
+ }
+
+ @SuppressWarnings("ConstantConditions")
+ private static @Nullable LogicalType findCommonNullableType(
+ List<LogicalType> normalizedTypes,
+ boolean hasAnyType,
+ boolean hasNullType) {
+
+ // all ANY types must be equal
+ if (hasAnyType) {
+ return findExactlySameType(normalizedTypes);
+ }
+
+ LogicalType resultType = null;
+
+ for (LogicalType type : normalizedTypes) {
+ final LogicalTypeRoot typeRoot = type.getTypeRoot();
+
+ // NULL does not affect the result of this loop
+ if (typeRoot == NULL) {
+ continue;
+ }
+
+ // if result type is still null, consider the current type as a potential
+ // result type candidate
+ if (resultType == null) {
+ resultType = type;
+ }
+
+ // find special patterns
+ final LogicalType patternType = findCommonTypePattern(resultType, type);
+ if (patternType != null) {
+ resultType = patternType;
+ continue;
+ }
+
+ // for types of family CONSTRUCTED
+ if (typeRoot == ARRAY) {
+ return findCommonArrayType(normalizedTypes);
+ } else if (typeRoot == MULTISET) {
+ return findCommonMultisetType(normalizedTypes);
+ } else if (typeRoot == MAP) {
+ return findCommonMapType(normalizedTypes);
+ } else if (typeRoot == ROW) {
+ return findCommonRowType(normalizedTypes);
+ }
+
+ // exit if two completely different types are compared (e.g. ROW and INT)
+ // this simplifies the following lines as we compare same interval families for example
+ if (!areSimilarTypes(resultType, type)) {
+ return null;
+ }
+
+ // for types of family CHARACTER_STRING or BINARY_STRING
+ if (hasFamily(type, CHARACTER_STRING) || hasFamily(type, BINARY_STRING)) {
+ final int length = combineLength(resultType, type);
+
+ if (hasRoot(resultType, VARCHAR) || hasRoot(resultType, VARBINARY)) {
+ // for variable length type we are done here
+ resultType = createStringType(resultType.getTypeRoot(), length);
+ } else {
+ // for mixed fixed/variable or fixed/fixed lengths
+ resultType = createStringType(typeRoot, length);
+ }
+ }
+ // for EXACT_NUMERIC types
+ else if (hasFamily(type, EXACT_NUMERIC)) {
+ if (hasFamily(resultType, EXACT_NUMERIC)) {
+ resultType = createCommonExactNumericType(resultType, type);
+ } else if (hasFamily(resultType, APPROXIMATE_NUMERIC)) {
+ // the result is already approximate
+ if (typeRoot == DECIMAL) {
+ // in case of DECIMAL we enforce DOUBLE
+ resultType = new DoubleType();
+ }
+ } else {
+ return null;
+ }
+ }
+ // for APPROXIMATE_NUMERIC types
+ else if (hasFamily(type, APPROXIMATE_NUMERIC)) {
+ if (hasFamily(type, APPROXIMATE_NUMERIC)) {
+ resultType = createCommonApproximateNumericType(resultType, type);
+ } else if (hasFamily(resultType, EXACT_NUMERIC)) {
+ // the result was exact so far
+ if (typeRoot == DECIMAL) {
+ // in case of DECIMAL we enforce DOUBLE
+ resultType = new DoubleType();
+ } else {
+ // enforce an approximate result
+ resultType = type;
+ }
+ } else {
+ return null;
+ }
+ }
+ // for TIME
+ else if (hasFamily(type, TIME)) {
+ if (hasFamily(resultType, TIME)) {
+ resultType = new TimeType(combinePrecision(resultType, type));
+ } else {
+ return null;
+ }
+ }
+ // for TIMESTAMP
+ else if (hasFamily(type, TIMESTAMP)) {
+ if (hasFamily(resultType, TIMESTAMP)) {
+ resultType = createCommonTimestampType(resultType, type);
+ } else {
+ return null;
+ }
+ }
+ // for day-time intervals
+ else if (typeRoot == INTERVAL_DAY_TIME) {
+ resultType = createCommonDayTimeIntervalType(
+ (DayTimeIntervalType) resultType,
+ (DayTimeIntervalType) type);
+ }
+ // for year-month intervals
+ else if (typeRoot == INTERVAL_YEAR_MONTH) {
+ resultType = createCommonYearMonthIntervalType(
+ (YearMonthIntervalType) resultType,
+ (YearMonthIntervalType) type);
+ }
+ // other types are handled by findCommonCastableType
+ else {
+ return null;
+ }
+ }
+
+ // NULL type only
+ if (resultType == null && hasNullType) {
+ return new NullType();
+ }
+
+ return resultType;
+ }
+
+ private static boolean areSimilarTypes(LogicalType left, LogicalType right) {
+ // two types are similar iff they can be the operands of an SQL equality predicate
+
+ // similarity based on families
+ if (hasFamily(left, CHARACTER_STRING) && hasFamily(right, CHARACTER_STRING)) {
+ return true;
+ } else if (hasFamily(left, BINARY_STRING) && hasFamily(right, BINARY_STRING)) {
+ return true;
+ } else if (hasFamily(left, NUMERIC) && hasFamily(right, NUMERIC)) {
+ return true;
+ } else if (hasFamily(left, TIME) && hasFamily(right, TIME)) {
+ return true;
+ } else if (hasFamily(left, TIMESTAMP) && hasFamily(right, TIMESTAMP)) {
+ return true;
+ }
+ // similarity based on root
+ return left.getTypeRoot() == right.getTypeRoot();
+ }
+
+ private static @Nullable LogicalType findExactlySameType(List<LogicalType> normalizedTypes) {
+ final LogicalType firstType = normalizedTypes.get(0);
+ for (LogicalType type : normalizedTypes) {
+ if (!type.equals(firstType)) {
+ return null;
+ }
+ }
+ return firstType;
+ }
+
+ private static @Nullable LogicalType findCommonTypePattern(LogicalType resultType, LogicalType type) {
+ if (hasFamily(resultType, DATETIME) && hasFamily(type, INTERVAL)) {
+ return resultType;
+ } else if (hasFamily(resultType, INTERVAL) && hasFamily(type, DATETIME)) {
+ return type;
+ } else if ((hasFamily(resultType, TIMESTAMP) || hasRoot(resultType, DATE)) && hasFamily(type, EXACT_NUMERIC)) {
+ return resultType;
+ } else if (hasFamily(resultType, EXACT_NUMERIC) && (hasFamily(type, TIMESTAMP) || hasRoot(type, DATE))) {
+ return type;
+ }
+ // for "DATETIME + EXACT_NUMERIC", EXACT_NUMERIC is always treated as an interval of days
+ // therefore, TIME + EXACT_NUMERIC is not supported
+ return null;
+ }
+
+ private static @Nullable LogicalType findCommonArrayType(List<LogicalType> normalizedTypes) {
+ final List<LogicalType> children = findCommonChildrenTypes(normalizedTypes);
+ if (children == null) {
+ return null;
+ }
+ return new ArrayType(children.get(0));
+ }
+
+ private static @Nullable LogicalType findCommonMultisetType(List<LogicalType> normalizedTypes) {
+ final List<LogicalType> children = findCommonChildrenTypes(normalizedTypes);
+ if (children == null) {
+ return null;
+ }
+ return new MultisetType(children.get(0));
+ }
+
+ private static @Nullable LogicalType findCommonMapType(List<LogicalType> normalizedTypes) {
+ final List<LogicalType> children = findCommonChildrenTypes(normalizedTypes);
+ if (children == null) {
+ return null;
+ }
+ return new MapType(children.get(0), children.get(1));
+ }
+
+ private static @Nullable LogicalType findCommonRowType(List<LogicalType> normalizedTypes) {
+ final List<LogicalType> children = findCommonChildrenTypes(normalizedTypes);
+ if (children == null) {
+ return null;
+ }
+ final RowType firstType = (RowType) normalizedTypes.get(0);
+ final List<RowType.RowField> newFields = IntStream.range(0, children.size())
+ .mapToObj(pos -> {
+ final LogicalType newType = children.get(pos);
+ final RowType.RowField originalField = firstType.getFields().get(pos);
+ if (originalField.getDescription().isPresent()) {
+ return new RowType.RowField(
+ originalField.getName(),
+ newType,
+ originalField.getDescription().get());
+ } else {
+ return new RowType.RowField(
+ originalField.getName(),
+ newType);
+ }
+ })
+ .collect(Collectors.toList());
+ return new RowType(newFields);
+ }
+
+ private static @Nullable List<LogicalType> findCommonChildrenTypes(List<LogicalType> normalizedTypes) {
+ final LogicalType firstType = normalizedTypes.get(0);
+ final LogicalTypeRoot typeRoot = firstType.getTypeRoot();
+ final int numberOfChildren = firstType.getChildren().size();
+
+ for (LogicalType type : normalizedTypes) {
+ // all types must have the same root
+ if (type.getTypeRoot() != typeRoot) {
+ return null;
+ }
+ // all types must have the same number of children
+ if (type.getChildren().size() != numberOfChildren) {
+ return null;
+ }
+ }
+
+ // recursively compute column-wise least restrictive
+ final List<LogicalType> resultChildren = new ArrayList<>(numberOfChildren);
+ for (int i = 0; i < numberOfChildren; i++) {
+ final Optional<LogicalType> childType = findCommonType(new ChildTypeView(normalizedTypes, i));
+ if (!childType.isPresent()) {
+ return null;
+ }
+ resultChildren.add(childType.get());
+ }
+ // no child should be empty at this point
+ return resultChildren;
+ }
+
+ private static LogicalType createCommonExactNumericType(LogicalType resultType, LogicalType type) {
+ // same EXACT_NUMERIC types
+ if (type.equals(resultType)) {
+ return resultType;
+ }
+
+ final LogicalTypeRoot resultTypeRoot = resultType.getTypeRoot();
+ final LogicalTypeRoot typeRoot = type.getTypeRoot();
+
+ // no DECIMAL types involved
+ if (resultTypeRoot != DECIMAL && typeRoot != DECIMAL) {
+ // type root contains order of precision
+ if (getPrecision(type) > getPrecision(resultType)) {
+ return type;
+ }
+ return resultType;
+ }
+
+ // determine DECIMAL with precision (p), scale (s) and number of whole digits (d):
+ // d = max(p1 - s1, p2 - s2)
+ // s <= max(s1, s2)
+ // p = s + d
+ final int p1 = getPrecision(resultType);
+ final int p2 = getPrecision(type);
+ final int s1 = getScale(resultType);
+ final int s2 = getScale(type);
+ final int maxPrecision = DecimalType.MAX_PRECISION;
+
+ int d = Math.max(p1 - s1, p2 - s2);
+ d = Math.min(d, maxPrecision);
+
+ int s = Math.max(s1, s2);
+ s = Math.min(s, maxPrecision - d);
+
+ final int p = d + s;
+
+ return new DecimalType(p, s);
+ }
+
+ private static LogicalType createCommonApproximateNumericType(LogicalType resultType, LogicalType type) {
+ if (hasRoot(resultType, DOUBLE) || hasRoot(type, DOUBLE)) {
+ return new DoubleType();
+ }
+ return resultType;
+ }
+
+ private static LogicalType createCommonTimestampType(LogicalType resultType, LogicalType type) {
+ // same types
+ if (type.equals(resultType)) {
+ return resultType;
+ }
+
+ final LogicalTypeRoot resultTypeRoot = resultType.getTypeRoot();
+ final LogicalTypeRoot typeRoot = type.getTypeRoot();
+ final int precision = combinePrecision(resultType, type);
+
+ // same type roots
+ if (typeRoot == resultTypeRoot) {
+ return createTimestampType(resultTypeRoot, precision);
+ }
+
+ // generalize to zoned type
+ if (typeRoot == TIMESTAMP_WITH_TIME_ZONE ||
+ resultTypeRoot == TIMESTAMP_WITH_TIME_ZONE) {
+ return createTimestampType(TIMESTAMP_WITH_TIME_ZONE, precision);
+ } else if (typeRoot == TIMESTAMP_WITH_LOCAL_TIME_ZONE ||
+ resultTypeRoot == TIMESTAMP_WITH_LOCAL_TIME_ZONE) {
+ return createTimestampType(TIMESTAMP_WITH_LOCAL_TIME_ZONE, precision);
+ }
+ return createTimestampType(TIMESTAMP_WITHOUT_TIME_ZONE, precision);
+ }
+
+ private static LogicalType createCommonDayTimeIntervalType(DayTimeIntervalType resultType, DayTimeIntervalType type) {
+ final int maxDayPrecision = Math.max(resultType.getDayPrecision(), type.getDayPrecision());
+ final int maxFractionalPrecision = Math.max(resultType.getFractionalPrecision(), type.getFractionalPrecision());
+ return new DayTimeIntervalType(
+ combineIntervalResolutions(
+ DayTimeResolution.values(),
+ DAY_TIME_RES_TO_BOUNDARIES,
+ DAY_TIME_BOUNDARIES_TO_RES,
+ resultType.getResolution(),
+ type.getResolution()),
+ maxDayPrecision,
+ maxFractionalPrecision);
+ }
+
+ private static LogicalType createCommonYearMonthIntervalType(YearMonthIntervalType resultType, YearMonthIntervalType type) {
+ final int maxYearPrecision = Math.max(resultType.getYearPrecision(), type.getYearPrecision());
+ return new YearMonthIntervalType(
+ combineIntervalResolutions(
+ YearMonthResolution.values(),
+ YEAR_MONTH_RES_TO_BOUNDARIES,
+ YEAR_MONTH_BOUNDARIES_TO_RES,
+ resultType.getResolution(),
+ type.getResolution()),
+ maxYearPrecision);
+ }
+
+ private static LogicalType createTimestampType(LogicalTypeRoot typeRoot, int precision) {
+ switch (typeRoot) {
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return new TimestampType(precision);
+ case TIMESTAMP_WITH_TIME_ZONE:
+ return new ZonedTimestampType(precision);
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ return new LocalZonedTimestampType(precision);
+ default:
+ throw new IllegalArgumentException();
+ }
+ }
+
+ private static LogicalType createStringType(LogicalTypeRoot typeRoot, int length) {
+ switch (typeRoot) {
+ case CHAR:
+ return new CharType(length);
+ case VARCHAR:
+ return new VarCharType(length);
+ case BINARY:
+ return new BinaryType(length);
+ case VARBINARY:
+ return new VarBinaryType(length);
+ default:
+ throw new IllegalArgumentException();
+ }
+ }
+
+ private static <T extends Enum<T>> T combineIntervalResolutions(
+ T[] res,
+ Map<T, List<T>> resToBoundaries,
+ Map<List<T>, T> boundariesToRes,
+ T left,
+ T right) {
+ final List<T> leftBoundaries = resToBoundaries.get(left);
+ final T leftStart = leftBoundaries.get(0);
+ final T leftEnd = leftBoundaries.get(leftBoundaries.size() - 1);
+
+ final List<T> rightBoundaries = resToBoundaries.get(right);
+ final T rightStart = rightBoundaries.get(0);
+ final T rightEnd = rightBoundaries.get(rightBoundaries.size() - 1);
+
+ final T combinedStart = res[Math.min(leftStart.ordinal(), rightStart.ordinal())];
+ final T combinedEnd = res[Math.max(leftEnd.ordinal(), rightEnd.ordinal())];
+
+ if (combinedStart == combinedEnd) {
+ return boundariesToRes.get(Collections.singletonList(combinedStart));
+ }
+ return boundariesToRes.get(Arrays.asList(combinedStart, combinedEnd));
+ }
+
+ private static int combinePrecision(LogicalType resultType, LogicalType type) {
+ final int p1 = getPrecision(resultType);
+ final int p2 = getPrecision(type);
+
+ return Math.max(p1, p2);
+ }
+
+ private static int combineLength(LogicalType resultType, LogicalType right) {
+ return Math.max(getLength(resultType), getLength(right));
+ }
+
+ /**
+ * A list that creates a view of all children at the given position.
+ */
+ private static class ChildTypeView extends AbstractList<LogicalType> {
+
+ private final List<LogicalType> types;
+ private final int childPos;
+
+ ChildTypeView(List<LogicalType> types, int childPos) {
+ this.types = types;
+ this.childPos = childPos;
+ }
+
+ @Override
+ public LogicalType get(int index) {
+ return types.get(index).getChildren().get(childPos);
+ }
+
+ @Override
+ public int size() {
+ return types.size();
+ }
+ }
+
+ private LogicalTypeGeneralization() {
+ // no instantiation
+ }
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeGeneralizationTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeGeneralizationTest.java
new file mode 100644
index 0000000..44209e7
--- /dev/null
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeGeneralizationTest.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType.YearMonthResolution;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeGeneralization;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link LogicalTypeGeneralization}.
+ */
+@RunWith(Parameterized.class)
+public class LogicalTypeGeneralizationTest {
+
+ @Parameters(name = "{index}: [Types: {0}, To: {1}]")
+ public static List<Object[]> testData() {
+ return Arrays.asList(
+ new Object[][]{
+
+ // simple types
+ {
+ Arrays.asList(new IntType(), new IntType()),
+ new IntType()
+ },
+
+ // incompatible types
+ {
+ Arrays.asList(new IntType(), new ArrayType(new IntType())),
+ null
+ },
+
+ // incompatible types
+ {
+ Arrays.asList(new IntType(), new VarCharType(23)),
+ null
+ },
+
+ // NOT NULL types
+ {
+ Arrays.asList(new IntType(false), new IntType(false)),
+ new IntType(false)
+ },
+
+ // NOT NULL with different types
+ {
+ Arrays.asList(new IntType(true), new BigIntType(false)),
+ new BigIntType()
+ },
+
+ // NULL only
+ {
+ Arrays.asList(new NullType(), new NullType()),
+ new NullType()
+ },
+
+ // NULL with other types
+ {
+ Arrays.asList(new NullType(), new IntType(), new IntType()),
+ new IntType()
+ },
+
+ // ARRAY types with same element type
+ {
+ Arrays.asList(new ArrayType(new IntType()), new ArrayType(new IntType())),
+ new ArrayType(new IntType())
+ },
+
+ // ARRAY types with different element types
+ {
+ Arrays.asList(new ArrayType(new BigIntType()), new ArrayType(new IntType())),
+ new ArrayType(new BigIntType())
+ },
+
+ // MULTISET types with different element type
+ {
+ Arrays.asList(new MultisetType(new BigIntType()), new MultisetType(new IntType())),
+ new MultisetType(new BigIntType())
+ },
+
+ // MAP types with different element type
+ {
+ Arrays.asList(
+ new MapType(new BigIntType(), new DoubleType()),
+ new MapType(new IntType(), new DoubleType())),
+ new MapType(new BigIntType(), new DoubleType())
+ },
+
+ // ROW type with different element types
+ {
+ Arrays.asList(
+ RowType.of(new IntType(), new IntType(), new BigIntType()),
+ RowType.of(new BigIntType(), new IntType(), new IntType())),
+ RowType.of(new BigIntType(), new IntType(), new BigIntType())
+ },
+
+ // CHAR types of different length
+ {
+ Arrays.asList(new CharType(2), new CharType(4)),
+ new CharType(4)
+ },
+
+ // VARCHAR types of different length
+ {
+ Arrays.asList(new VarCharType(2), new VarCharType(VarCharType.MAX_LENGTH)),
+ new VarCharType(VarCharType.MAX_LENGTH)
+ },
+
+ // mixed VARCHAR and CHAR types
+ {
+ Arrays.asList(new VarCharType(2), new CharType(5)),
+ new VarCharType(5)
+ },
+
+ // more mixed VARCHAR and CHAR types
+ {
+ Arrays.asList(new CharType(5), new VarCharType(2), new VarCharType(7)),
+ new VarCharType(7)
+ },
+
+ // mixed BINARY and VARBINARY types
+ {
+ Arrays.asList(new BinaryType(5), new VarBinaryType(2), new VarBinaryType(7)),
+ new VarBinaryType(7)
+ },
+
+ // two APPROXIMATE_NUMERIC types
+ {
+ Arrays.asList(new DoubleType(), new FloatType()),
+ new DoubleType()
+ },
+
+ // one APPROXIMATE_NUMERIC and one DECIMAL type
+ {
+ Arrays.asList(new DoubleType(), new DecimalType(2, 2)),
+ new DoubleType()
+ },
+
+ // one APPROXIMATE_NUMERIC and one EXACT_NUMERIC type
+ {
+ Arrays.asList(new DoubleType(), new IntType()),
+ new DoubleType()
+ },
+
+ // two APPROXIMATE_NUMERIC and one DECIMAL type
+ {
+ Arrays.asList(new DecimalType(2, 2), new DoubleType(), new FloatType()),
+ new DoubleType()
+ },
+
+ // DECIMAL precision and scale merging
+ {
+ Arrays.asList(new DecimalType(2, 2), new DecimalType(5, 2), new DecimalType(7, 5)),
+ new DecimalType(8, 5)
+ },
+
+ // DECIMAL precision and scale merging with other EXACT_NUMERIC types
+ {
+ Arrays.asList(new DecimalType(2, 2), new IntType(), new BigIntType()),
+ new DecimalType(21, 2)
+ },
+
+ // unsupported time merging
+ {
+ Arrays.asList(new DateType(), new DateType(), new TimeType()),
+ null
+ },
+
+ // time precision merging
+ {
+ Arrays.asList(new TimeType(3), new TimeType(5), new TimeType(2)),
+ new TimeType(5)
+ },
+
+ // timestamp precision merging
+ {
+ Arrays.asList(new TimestampType(3), new TimestampType(5), new TimestampType(2)),
+ new TimestampType(5)
+ },
+
+ // timestamp merging
+ {
+ Arrays.asList(new TimestampType(3), new ZonedTimestampType(5), new LocalZonedTimestampType(2)),
+ new ZonedTimestampType(5)
+ },
+
+ // timestamp merging
+ {
+ Arrays.asList(new TimestampType(3), new LocalZonedTimestampType(2)),
+ new LocalZonedTimestampType(3)
+ },
+
+ // day-time interval + DATETIME
+ {
+ Arrays.asList(new DayTimeIntervalType(DayTimeResolution.DAY), new DateType()),
+ new DateType()
+ },
+
+ // year-month interval + DATETIME
+ {
+ Arrays.asList(new YearMonthIntervalType(YearMonthResolution.MONTH), new DateType()),
+ new DateType()
+ },
+
+ // DATETIME + INTERVAL
+ {
+ Arrays.asList(new TimeType(), new DayTimeIntervalType(DayTimeResolution.MINUTE)),
+ new TimeType()
+ },
+
+ // EXACT_NUMERIC + DATE
+ {
+ Arrays.asList(new IntType(), new DateType()),
+ new DateType()
+ },
+
+ // TIME + EXACT_NUMERIC
+ {
+ Arrays.asList(new TimeType(), new DecimalType()),
+ null
+ },
+
+ // TIMESTAMP + EXACT_NUMERIC
+ {
+ Arrays.asList(new TimestampType(), new DecimalType()),
+ new TimestampType()
+ },
+
+ // day-time intervals
+ {
+ Arrays.asList(
+ new DayTimeIntervalType(DayTimeResolution.DAY_TO_MINUTE),
+ new DayTimeIntervalType(DayTimeResolution.SECOND)),
+ new DayTimeIntervalType(DayTimeResolution.DAY_TO_SECOND)
+ },
+
+ // day-time intervals
+ {
+ Arrays.asList(
+ new DayTimeIntervalType(DayTimeResolution.HOUR),
+ new DayTimeIntervalType(
+ DayTimeResolution.SECOND,
+ DayTimeIntervalType.DEFAULT_DAY_PRECISION,
+ 0)),
+ new DayTimeIntervalType(
+ DayTimeResolution.HOUR_TO_SECOND,
+ DayTimeIntervalType.DEFAULT_DAY_PRECISION,
+ 6)
+ },
+
+ // year-month intervals
+ {
+ Arrays.asList(
+ new YearMonthIntervalType(YearMonthResolution.MONTH),
+ new YearMonthIntervalType(YearMonthResolution.YEAR)),
+ new YearMonthIntervalType(YearMonthResolution.YEAR_TO_MONTH)
+ },
+
+ }
+ );
+ }
+
+ @Parameter
+ public List<LogicalType> types;
+
+ @Parameter(1)
+ public LogicalType commonType;
+
+ @Test
+ public void testCommonType() {
+ assertThat(
+ LogicalTypeGeneralization.findCommonType(types),
+ equalTo(Optional.ofNullable(commonType)));
+ }
+}