You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/07/08 12:28:34 UTC

[flink] branch master updated: [FLINK-12968][table-common] Add an utility for finding a common type from a set of types

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3bcd47f  [FLINK-12968][table-common] Add an utility for finding a common type from a set of types
3bcd47f is described below

commit 3bcd47fac93c14b23bfc8d15f1a6dccd607631fb
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon Jul 1 12:11:19 2019 +0200

    [FLINK-12968][table-common] Add an utility for finding a common type from a set of types
    
    This closes #8933.
---
 .../table/types/logical/DayTimeIntervalType.java   |   2 +
 .../flink/table/types/logical/LogicalType.java     |   5 +
 .../table/types/logical/YearMonthIntervalType.java |   6 +-
 .../logical/utils/LogicalTypeGeneralization.java   | 658 +++++++++++++++++++++
 .../table/types/LogicalTypeGeneralizationTest.java | 325 ++++++++++
 5 files changed, 994 insertions(+), 2 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DayTimeIntervalType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DayTimeIntervalType.java
index a9eb21c..85b700b 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DayTimeIntervalType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DayTimeIntervalType.java
@@ -101,6 +101,8 @@ public final class DayTimeIntervalType extends LogicalType {
 
 	/**
 	 * Supported resolutions of this type.
+	 *
+	 * <p>Note: The order of this enum reflects the granularity from coarse to fine.
 	 */
 	public enum DayTimeResolution {
 		DAY,
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java
index 8b03dcf..4e4942a 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java
@@ -19,6 +19,8 @@
 package org.apache.flink.table.types.logical;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.types.logical.utils.LogicalTypeCasts;
+import org.apache.flink.table.types.logical.utils.LogicalTypeGeneralization;
 import org.apache.flink.util.Preconditions;
 
 import java.io.Serializable;
@@ -43,6 +45,9 @@ import java.util.Set;
  * <p>Instances of this class describe the fully parameterized, immutable type with additional
  * information such as numeric precision or expected length.
  *
+ * <p>Contracts how logical types relate to other types are defined by {@link LogicalTypeCasts} and
+ * {@link LogicalTypeGeneralization}.
+ *
  * <p>NOTE: A logical type is just a description of a type, a planner or runtime might not support
  * every type in every logical precision yet!
  */
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/YearMonthIntervalType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/YearMonthIntervalType.java
index 8891398..7c2622d 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/YearMonthIntervalType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/YearMonthIntervalType.java
@@ -74,11 +74,13 @@ public final class YearMonthIntervalType extends LogicalType {
 
 	/**
 	 * Supported resolutions of this type.
+	 *
+	 * <p>Note: The order of this enum reflects the granularity from coarse to fine.
 	 */
 	public enum YearMonthResolution {
 		YEAR,
-		MONTH,
-		YEAR_TO_MONTH
+		YEAR_TO_MONTH,
+		MONTH
 	}
 
 	private final YearMonthResolution resolution;
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeGeneralization.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeGeneralization.java
new file mode 100644
index 0000000..18ffeb3
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeGeneralization.java
@@ -0,0 +1,658 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType.YearMonthResolution;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution.DAY;
+import static org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution.DAY_TO_HOUR;
+import static org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution.DAY_TO_MINUTE;
+import static org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution.DAY_TO_SECOND;
+import static org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution.HOUR;
+import static org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution.HOUR_TO_MINUTE;
+import static org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution.HOUR_TO_SECOND;
+import static org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution.MINUTE;
+import static org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution.MINUTE_TO_SECOND;
+import static org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution.SECOND;
+import static org.apache.flink.table.types.logical.LogicalTypeFamily.APPROXIMATE_NUMERIC;
+import static org.apache.flink.table.types.logical.LogicalTypeFamily.BINARY_STRING;
+import static org.apache.flink.table.types.logical.LogicalTypeFamily.CHARACTER_STRING;
+import static org.apache.flink.table.types.logical.LogicalTypeFamily.DATETIME;
+import static org.apache.flink.table.types.logical.LogicalTypeFamily.EXACT_NUMERIC;
+import static org.apache.flink.table.types.logical.LogicalTypeFamily.INTERVAL;
+import static org.apache.flink.table.types.logical.LogicalTypeFamily.NUMERIC;
+import static org.apache.flink.table.types.logical.LogicalTypeFamily.TIME;
+import static org.apache.flink.table.types.logical.LogicalTypeFamily.TIMESTAMP;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.ANY;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.ARRAY;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.DATE;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.DECIMAL;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.DOUBLE;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.INTERVAL_DAY_TIME;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.INTERVAL_YEAR_MONTH;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.MAP;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.MULTISET;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.NULL;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.ROW;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARBINARY;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARCHAR;
+import static org.apache.flink.table.types.logical.YearMonthIntervalType.YearMonthResolution.MONTH;
+import static org.apache.flink.table.types.logical.YearMonthIntervalType.YearMonthResolution.YEAR;
+import static org.apache.flink.table.types.logical.YearMonthIntervalType.YearMonthResolution.YEAR_TO_MONTH;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeCasts.supportsImplicitCast;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getLength;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasFamily;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot;
+
+/**
+ * Utilities for finding a common, more general {@link LogicalType} for a given set of types. If such
+ * a type exists, all given types can be casted to this more general type.
+ *
+ * <p>This class aims to be compatible with the SQL standard. It is inspired by Apache Calcite's
+ * {@code SqlTypeFactoryImpl#leastRestrictive} method.
+ */
+@Internal
+public final class LogicalTypeGeneralization {
+
+	// mappings for interval generalization
+	private static final Map<YearMonthResolution, List<YearMonthResolution>> YEAR_MONTH_RES_TO_BOUNDARIES = new HashMap<>();
+	private static final Map<List<YearMonthResolution>, YearMonthResolution> YEAR_MONTH_BOUNDARIES_TO_RES = new HashMap<>();
+	static {
+		addYearMonthMapping(YEAR, YEAR);
+		addYearMonthMapping(MONTH, MONTH);
+		addYearMonthMapping(YEAR_TO_MONTH, YEAR, MONTH);
+	}
+
+	private static final Map<DayTimeResolution, List<DayTimeResolution>> DAY_TIME_RES_TO_BOUNDARIES = new HashMap<>();
+	private static final Map<List<DayTimeResolution>, DayTimeResolution> DAY_TIME_BOUNDARIES_TO_RES = new HashMap<>();
+	static {
+		addDayTimeMapping(DAY, DAY);
+		addDayTimeMapping(DAY_TO_HOUR, DAY, HOUR);
+		addDayTimeMapping(DAY_TO_MINUTE, DAY, MINUTE);
+		addDayTimeMapping(DAY_TO_SECOND, DAY, SECOND);
+		addDayTimeMapping(HOUR, HOUR);
+		addDayTimeMapping(HOUR_TO_MINUTE, HOUR, MINUTE);
+		addDayTimeMapping(HOUR_TO_SECOND, HOUR, SECOND);
+		addDayTimeMapping(MINUTE, MINUTE);
+		addDayTimeMapping(MINUTE_TO_SECOND, MINUTE, SECOND);
+		addDayTimeMapping(SECOND, SECOND);
+	}
+
+	private static void addYearMonthMapping(YearMonthResolution to, YearMonthResolution... boundaries) {
+		final List<YearMonthResolution> boundariesList = Arrays.asList(boundaries);
+		YEAR_MONTH_RES_TO_BOUNDARIES.put(to, boundariesList);
+		YEAR_MONTH_BOUNDARIES_TO_RES.put(boundariesList, to);
+	}
+
+	private static void addDayTimeMapping(DayTimeResolution to, DayTimeResolution... boundaries) {
+		final List<DayTimeResolution> boundariesList = Arrays.asList(boundaries);
+		DAY_TIME_RES_TO_BOUNDARIES.put(to, boundariesList);
+		DAY_TIME_BOUNDARIES_TO_RES.put(boundariesList, to);
+	}
+
+	/**
+	 * Returns the most common type of a set of types. It determines a type to which all given types
+	 * can be casted.
+	 *
+	 * <p>For example: {@code [INT, BIGINT, DECIMAL(2, 2)]} would lead to {@code DECIMAL(21, 2)}.
+	 */
+	public static Optional<LogicalType> findCommonType(List<LogicalType> types) {
+		Preconditions.checkArgument(types.size() > 0, "List of types must not be empty.");
+
+		// collect statistics first
+		boolean hasAnyType = false;
+		boolean hasNullType = false;
+		boolean hasNullableTypes = false;
+		for (LogicalType type : types) {
+			final LogicalTypeRoot typeRoot = type.getTypeRoot();
+			if (typeRoot == ANY) {
+				hasAnyType = true;
+			} else if (typeRoot == NULL) {
+				hasNullType = true;
+			}
+			if (type.isNullable()) {
+				hasNullableTypes = true;
+			}
+		}
+
+		final List<LogicalType> normalizedTypes = types.stream()
+			.map(t -> t.copy(true))
+			.collect(Collectors.toList());
+
+		LogicalType foundType = findCommonNullableType(normalizedTypes, hasAnyType, hasNullType);
+		if (foundType == null) {
+			foundType = findCommonCastableType(normalizedTypes);
+		}
+
+		if (foundType != null) {
+			final LogicalType typeWithNullability = foundType.copy(hasNullableTypes);
+			return Optional.of(typeWithNullability);
+		}
+		return Optional.empty();
+	}
+
+	private static @Nullable LogicalType findCommonCastableType(List<LogicalType> normalizedTypes) {
+		LogicalType resultType = normalizedTypes.get(0);
+
+		for (LogicalType type : normalizedTypes) {
+			final LogicalTypeRoot typeRoot = type.getTypeRoot();
+
+			// NULL does not affect the result of this loop
+			if (typeRoot == NULL) {
+				continue;
+			}
+
+			if (supportsImplicitCast(resultType, type)) {
+				resultType = type;
+			} else {
+				if (!supportsImplicitCast(type, resultType)) {
+					return null;
+				}
+			}
+		}
+
+		return resultType;
+	}
+
+	@SuppressWarnings("ConstantConditions")
+	private static @Nullable LogicalType findCommonNullableType(
+			List<LogicalType> normalizedTypes,
+			boolean hasAnyType,
+			boolean hasNullType) {
+
+		// all ANY types must be equal
+		if (hasAnyType) {
+			return findExactlySameType(normalizedTypes);
+		}
+
+		LogicalType resultType = null;
+
+		for (LogicalType type : normalizedTypes) {
+			final LogicalTypeRoot typeRoot = type.getTypeRoot();
+
+			// NULL does not affect the result of this loop
+			if (typeRoot == NULL) {
+				continue;
+			}
+
+			// if result type is still null, consider the current type as a potential
+			// result type candidate
+			if (resultType == null) {
+				resultType = type;
+			}
+
+			// find special patterns
+			final LogicalType patternType = findCommonTypePattern(resultType, type);
+			if (patternType != null) {
+				resultType = patternType;
+				continue;
+			}
+
+			// for types of family CONSTRUCTED
+			if (typeRoot == ARRAY) {
+				return findCommonArrayType(normalizedTypes);
+			} else if (typeRoot == MULTISET) {
+				return findCommonMultisetType(normalizedTypes);
+			} else if (typeRoot == MAP) {
+				return findCommonMapType(normalizedTypes);
+			} else if (typeRoot == ROW) {
+				return findCommonRowType(normalizedTypes);
+			}
+
+			// exit if two completely different types are compared (e.g. ROW and INT)
+			// this simplifies the following lines as we compare same interval families for example
+			if (!areSimilarTypes(resultType, type)) {
+				return null;
+			}
+
+			// for types of family CHARACTER_STRING or BINARY_STRING
+			if (hasFamily(type, CHARACTER_STRING) || hasFamily(type, BINARY_STRING)) {
+				final int length = combineLength(resultType, type);
+
+				if (hasRoot(resultType, VARCHAR) || hasRoot(resultType, VARBINARY)) {
+					// for variable length type we are done here
+					resultType = createStringType(resultType.getTypeRoot(), length);
+				} else {
+					// for mixed fixed/variable or fixed/fixed lengths
+					resultType = createStringType(typeRoot, length);
+				}
+			}
+			// for EXACT_NUMERIC types
+			else if (hasFamily(type, EXACT_NUMERIC)) {
+				if (hasFamily(resultType, EXACT_NUMERIC)) {
+					resultType = createCommonExactNumericType(resultType, type);
+				} else if (hasFamily(resultType, APPROXIMATE_NUMERIC)) {
+					// the result is already approximate
+					if (typeRoot == DECIMAL) {
+						// in case of DECIMAL we enforce DOUBLE
+						resultType = new DoubleType();
+					}
+				} else {
+					return null;
+				}
+			}
+			// for APPROXIMATE_NUMERIC types
+			else if (hasFamily(type, APPROXIMATE_NUMERIC)) {
+				if (hasFamily(type, APPROXIMATE_NUMERIC)) {
+					resultType = createCommonApproximateNumericType(resultType, type);
+				} else if (hasFamily(resultType, EXACT_NUMERIC)) {
+					// the result was exact so far
+					if (typeRoot == DECIMAL) {
+						// in case of DECIMAL we enforce DOUBLE
+						resultType = new DoubleType();
+					} else {
+						// enforce an approximate result
+						resultType = type;
+					}
+				} else {
+					return null;
+				}
+			}
+			// for TIME
+			else if (hasFamily(type, TIME)) {
+				if (hasFamily(resultType, TIME)) {
+					resultType = new TimeType(combinePrecision(resultType, type));
+				} else {
+					return null;
+				}
+			}
+			// for TIMESTAMP
+			else if (hasFamily(type, TIMESTAMP)) {
+				if (hasFamily(resultType, TIMESTAMP)) {
+					resultType = createCommonTimestampType(resultType, type);
+				} else {
+					return null;
+				}
+			}
+			// for day-time intervals
+			else if (typeRoot == INTERVAL_DAY_TIME) {
+				resultType = createCommonDayTimeIntervalType(
+					(DayTimeIntervalType) resultType,
+					(DayTimeIntervalType) type);
+			}
+			// for year-month intervals
+			else if (typeRoot == INTERVAL_YEAR_MONTH) {
+				resultType = createCommonYearMonthIntervalType(
+					(YearMonthIntervalType) resultType,
+					(YearMonthIntervalType) type);
+			}
+			// other types are handled by findCommonCastableType
+			else {
+				return null;
+			}
+		}
+
+		// NULL type only
+		if (resultType == null && hasNullType) {
+			return new NullType();
+		}
+
+		return resultType;
+	}
+
+	private static boolean areSimilarTypes(LogicalType left, LogicalType right) {
+		// two types are similar iff they can be the operands of an SQL equality predicate
+
+		// similarity based on families
+		if (hasFamily(left, CHARACTER_STRING) && hasFamily(right, CHARACTER_STRING)) {
+			return true;
+		} else if (hasFamily(left, BINARY_STRING) && hasFamily(right, BINARY_STRING)) {
+			return true;
+		} else if (hasFamily(left, NUMERIC) && hasFamily(right, NUMERIC)) {
+			return true;
+		} else if (hasFamily(left, TIME) && hasFamily(right, TIME)) {
+			return true;
+		} else if (hasFamily(left, TIMESTAMP) && hasFamily(right, TIMESTAMP)) {
+			return true;
+		}
+		// similarity based on root
+		return left.getTypeRoot() == right.getTypeRoot();
+	}
+
+	private static @Nullable LogicalType findExactlySameType(List<LogicalType> normalizedTypes) {
+		final LogicalType firstType = normalizedTypes.get(0);
+		for (LogicalType type : normalizedTypes) {
+			if (!type.equals(firstType)) {
+				return null;
+			}
+		}
+		return firstType;
+	}
+
+	private static @Nullable LogicalType findCommonTypePattern(LogicalType resultType, LogicalType type) {
+		if (hasFamily(resultType, DATETIME) && hasFamily(type, INTERVAL)) {
+			return resultType;
+		} else if (hasFamily(resultType, INTERVAL) && hasFamily(type, DATETIME)) {
+			return type;
+		} else if ((hasFamily(resultType, TIMESTAMP) || hasRoot(resultType, DATE)) && hasFamily(type, EXACT_NUMERIC)) {
+			return resultType;
+		} else if (hasFamily(resultType, EXACT_NUMERIC) && (hasFamily(type, TIMESTAMP) || hasRoot(type, DATE))) {
+			return type;
+		}
+		// for "DATETIME + EXACT_NUMERIC", EXACT_NUMERIC is always treated as an interval of days
+		// therefore, TIME + EXACT_NUMERIC is not supported
+		return null;
+	}
+
+	private static @Nullable LogicalType findCommonArrayType(List<LogicalType> normalizedTypes) {
+		final List<LogicalType> children = findCommonChildrenTypes(normalizedTypes);
+		if (children == null) {
+			return null;
+		}
+		return new ArrayType(children.get(0));
+	}
+
+	private static @Nullable LogicalType findCommonMultisetType(List<LogicalType> normalizedTypes) {
+		final List<LogicalType> children = findCommonChildrenTypes(normalizedTypes);
+		if (children == null) {
+			return null;
+		}
+		return new MultisetType(children.get(0));
+	}
+
+	private static @Nullable LogicalType findCommonMapType(List<LogicalType> normalizedTypes) {
+		final List<LogicalType> children = findCommonChildrenTypes(normalizedTypes);
+		if (children == null) {
+			return null;
+		}
+		return new MapType(children.get(0), children.get(1));
+	}
+
+	private static @Nullable LogicalType findCommonRowType(List<LogicalType> normalizedTypes) {
+		final List<LogicalType> children = findCommonChildrenTypes(normalizedTypes);
+		if (children == null) {
+			return null;
+		}
+		final RowType firstType = (RowType) normalizedTypes.get(0);
+		final List<RowType.RowField> newFields = IntStream.range(0, children.size())
+			.mapToObj(pos -> {
+				final LogicalType newType = children.get(pos);
+				final RowType.RowField originalField = firstType.getFields().get(pos);
+				if (originalField.getDescription().isPresent()) {
+					return new RowType.RowField(
+						originalField.getName(),
+						newType,
+						originalField.getDescription().get());
+				} else {
+					return new RowType.RowField(
+						originalField.getName(),
+						newType);
+				}
+			})
+			.collect(Collectors.toList());
+		return new RowType(newFields);
+	}
+
+	private static @Nullable List<LogicalType> findCommonChildrenTypes(List<LogicalType> normalizedTypes) {
+		final LogicalType firstType = normalizedTypes.get(0);
+		final LogicalTypeRoot typeRoot = firstType.getTypeRoot();
+		final int numberOfChildren = firstType.getChildren().size();
+
+		for (LogicalType type : normalizedTypes) {
+			// all types must have the same root
+			if (type.getTypeRoot() != typeRoot) {
+				return null;
+			}
+			// all types must have the same number of children
+			if (type.getChildren().size() != numberOfChildren) {
+				return null;
+			}
+		}
+
+		// recursively compute column-wise least restrictive
+		final List<LogicalType> resultChildren = new ArrayList<>(numberOfChildren);
+		for (int i = 0; i < numberOfChildren; i++) {
+			final Optional<LogicalType> childType = findCommonType(new ChildTypeView(normalizedTypes, i));
+			if (!childType.isPresent()) {
+				return null;
+			}
+			resultChildren.add(childType.get());
+		}
+		// no child should be empty at this point
+		return resultChildren;
+	}
+
+	private static LogicalType createCommonExactNumericType(LogicalType resultType, LogicalType type) {
+		// same EXACT_NUMERIC types
+		if (type.equals(resultType)) {
+			return resultType;
+		}
+
+		final LogicalTypeRoot resultTypeRoot = resultType.getTypeRoot();
+		final LogicalTypeRoot typeRoot = type.getTypeRoot();
+
+		// no DECIMAL types involved
+		if (resultTypeRoot != DECIMAL && typeRoot != DECIMAL) {
+			// type root contains order of precision
+			if (getPrecision(type) > getPrecision(resultType)) {
+				return type;
+			}
+			return resultType;
+		}
+
+		// determine DECIMAL with precision (p), scale (s) and number of whole digits (d):
+		// d = max(p1 - s1, p2 - s2)
+		// s <= max(s1, s2)
+		// p = s + d
+		final int p1 = getPrecision(resultType);
+		final int p2 = getPrecision(type);
+		final int s1 = getScale(resultType);
+		final int s2 = getScale(type);
+		final int maxPrecision = DecimalType.MAX_PRECISION;
+
+		int d = Math.max(p1 - s1, p2 - s2);
+		d = Math.min(d, maxPrecision);
+
+		int s = Math.max(s1, s2);
+		s = Math.min(s, maxPrecision - d);
+
+		final int p = d + s;
+
+		return new DecimalType(p, s);
+	}
+
+	private static LogicalType createCommonApproximateNumericType(LogicalType resultType, LogicalType type) {
+		if (hasRoot(resultType, DOUBLE) || hasRoot(type, DOUBLE)) {
+			return new DoubleType();
+		}
+		return resultType;
+	}
+
+	private static LogicalType createCommonTimestampType(LogicalType resultType, LogicalType type) {
+		// same types
+		if (type.equals(resultType)) {
+			return resultType;
+		}
+
+		final LogicalTypeRoot resultTypeRoot = resultType.getTypeRoot();
+		final LogicalTypeRoot typeRoot = type.getTypeRoot();
+		final int precision = combinePrecision(resultType, type);
+
+		// same type roots
+		if (typeRoot == resultTypeRoot) {
+			return createTimestampType(resultTypeRoot, precision);
+		}
+
+		// generalize to zoned type
+		if (typeRoot == TIMESTAMP_WITH_TIME_ZONE ||
+				resultTypeRoot == TIMESTAMP_WITH_TIME_ZONE) {
+			return createTimestampType(TIMESTAMP_WITH_TIME_ZONE, precision);
+		} else if (typeRoot == TIMESTAMP_WITH_LOCAL_TIME_ZONE ||
+				resultTypeRoot == TIMESTAMP_WITH_LOCAL_TIME_ZONE) {
+			return createTimestampType(TIMESTAMP_WITH_LOCAL_TIME_ZONE, precision);
+		}
+		return createTimestampType(TIMESTAMP_WITHOUT_TIME_ZONE, precision);
+	}
+
+	private static LogicalType createCommonDayTimeIntervalType(DayTimeIntervalType resultType, DayTimeIntervalType type) {
+		final int maxDayPrecision = Math.max(resultType.getDayPrecision(), type.getDayPrecision());
+		final int maxFractionalPrecision = Math.max(resultType.getFractionalPrecision(), type.getFractionalPrecision());
+		return new DayTimeIntervalType(
+			combineIntervalResolutions(
+				DayTimeResolution.values(),
+				DAY_TIME_RES_TO_BOUNDARIES,
+				DAY_TIME_BOUNDARIES_TO_RES,
+				resultType.getResolution(),
+				type.getResolution()),
+			maxDayPrecision,
+			maxFractionalPrecision);
+	}
+
+	private static LogicalType createCommonYearMonthIntervalType(YearMonthIntervalType resultType, YearMonthIntervalType type) {
+		final int maxYearPrecision = Math.max(resultType.getYearPrecision(), type.getYearPrecision());
+		return new YearMonthIntervalType(
+			combineIntervalResolutions(
+				YearMonthResolution.values(),
+				YEAR_MONTH_RES_TO_BOUNDARIES,
+				YEAR_MONTH_BOUNDARIES_TO_RES,
+				resultType.getResolution(),
+				type.getResolution()),
+			maxYearPrecision);
+	}
+
+	private static LogicalType createTimestampType(LogicalTypeRoot typeRoot, int precision) {
+		switch (typeRoot) {
+			case TIMESTAMP_WITHOUT_TIME_ZONE:
+				return new TimestampType(precision);
+			case TIMESTAMP_WITH_TIME_ZONE:
+				return new ZonedTimestampType(precision);
+			case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+				return new LocalZonedTimestampType(precision);
+			default:
+				throw new IllegalArgumentException();
+		}
+	}
+
+	private static LogicalType createStringType(LogicalTypeRoot typeRoot, int length) {
+		switch (typeRoot) {
+			case CHAR:
+				return new CharType(length);
+			case VARCHAR:
+				return new VarCharType(length);
+			case BINARY:
+				return new BinaryType(length);
+			case VARBINARY:
+				return new VarBinaryType(length);
+			default:
+				throw new IllegalArgumentException();
+		}
+	}
+
+	private static <T extends Enum<T>> T combineIntervalResolutions(
+			T[] res,
+			Map<T, List<T>> resToBoundaries,
+			Map<List<T>, T> boundariesToRes,
+			T left,
+			T right) {
+		final List<T> leftBoundaries = resToBoundaries.get(left);
+		final T leftStart = leftBoundaries.get(0);
+		final T leftEnd = leftBoundaries.get(leftBoundaries.size() - 1);
+
+		final List<T> rightBoundaries = resToBoundaries.get(right);
+		final T rightStart = rightBoundaries.get(0);
+		final T rightEnd = rightBoundaries.get(rightBoundaries.size() - 1);
+
+		final T combinedStart = res[Math.min(leftStart.ordinal(), rightStart.ordinal())];
+		final T combinedEnd = res[Math.max(leftEnd.ordinal(), rightEnd.ordinal())];
+
+		if (combinedStart == combinedEnd) {
+			return boundariesToRes.get(Collections.singletonList(combinedStart));
+		}
+		return boundariesToRes.get(Arrays.asList(combinedStart, combinedEnd));
+	}
+
+	private static int combinePrecision(LogicalType resultType, LogicalType type) {
+		final int p1 = getPrecision(resultType);
+		final int p2 = getPrecision(type);
+
+		return Math.max(p1, p2);
+	}
+
+	private static int combineLength(LogicalType resultType, LogicalType right) {
+		return Math.max(getLength(resultType), getLength(right));
+	}
+
+	/**
+	 * A list that creates a view of all children at the given position.
+	 */
+	private static class ChildTypeView extends AbstractList<LogicalType> {
+
+		private final List<LogicalType> types;
+		private final int childPos;
+
+		ChildTypeView(List<LogicalType> types, int childPos) {
+			this.types = types;
+			this.childPos = childPos;
+		}
+
+		@Override
+		public LogicalType get(int index) {
+			return types.get(index).getChildren().get(childPos);
+		}
+
+		@Override
+		public int size() {
+			return types.size();
+		}
+	}
+
+	private LogicalTypeGeneralization() {
+		// no instantiation
+	}
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeGeneralizationTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeGeneralizationTest.java
new file mode 100644
index 0000000..44209e7
--- /dev/null
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeGeneralizationTest.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType.YearMonthResolution;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeGeneralization;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link LogicalTypeGeneralization}.
+ */
+@RunWith(Parameterized.class)
+public class LogicalTypeGeneralizationTest {
+
+	@Parameters(name = "{index}: [Types: {0}, To: {1}]")
+	public static List<Object[]> testData() {
+		return Arrays.asList(
+			new Object[][]{
+
+				// simple types
+				{
+					Arrays.asList(new IntType(), new IntType()),
+					new IntType()
+				},
+
+				// incompatible types
+				{
+					Arrays.asList(new IntType(), new ArrayType(new IntType())),
+					null
+				},
+
+				// incompatible types
+				{
+					Arrays.asList(new IntType(), new VarCharType(23)),
+					null
+				},
+
+				// NOT NULL types
+				{
+					Arrays.asList(new IntType(false), new IntType(false)),
+					new IntType(false)
+				},
+
+				// NOT NULL with different types
+				{
+					Arrays.asList(new IntType(true), new BigIntType(false)),
+					new BigIntType()
+				},
+
+				// NULL only
+				{
+					Arrays.asList(new NullType(), new NullType()),
+					new NullType()
+				},
+
+				// NULL with other types
+				{
+					Arrays.asList(new NullType(), new IntType(), new IntType()),
+					new IntType()
+				},
+
+				// ARRAY types with same element type
+				{
+					Arrays.asList(new ArrayType(new IntType()), new ArrayType(new IntType())),
+					new ArrayType(new IntType())
+				},
+
+				// ARRAY types with different element types
+				{
+					Arrays.asList(new ArrayType(new BigIntType()), new ArrayType(new IntType())),
+					new ArrayType(new BigIntType())
+				},
+
+				// MULTISET types with different element type
+				{
+					Arrays.asList(new MultisetType(new BigIntType()), new MultisetType(new IntType())),
+					new MultisetType(new BigIntType())
+				},
+
+				// MAP types with different element type
+				{
+					Arrays.asList(
+						new MapType(new BigIntType(), new DoubleType()),
+						new MapType(new IntType(), new DoubleType())),
+					new MapType(new BigIntType(), new DoubleType())
+				},
+
+				// ROW type with different element types
+				{
+					Arrays.asList(
+						RowType.of(new IntType(), new IntType(), new BigIntType()),
+						RowType.of(new BigIntType(), new IntType(), new IntType())),
+					RowType.of(new BigIntType(), new IntType(), new BigIntType())
+				},
+
+				// CHAR types of different length
+				{
+					Arrays.asList(new CharType(2), new CharType(4)),
+					new CharType(4)
+				},
+
+				// VARCHAR types of different length
+				{
+					Arrays.asList(new VarCharType(2), new VarCharType(VarCharType.MAX_LENGTH)),
+					new VarCharType(VarCharType.MAX_LENGTH)
+				},
+
+				// mixed VARCHAR and CHAR types
+				{
+					Arrays.asList(new VarCharType(2), new CharType(5)),
+					new VarCharType(5)
+				},
+
+				// more mixed VARCHAR and CHAR types
+				{
+					Arrays.asList(new CharType(5), new VarCharType(2), new VarCharType(7)),
+					new VarCharType(7)
+				},
+
+				// mixed BINARY and VARBINARY types
+				{
+					Arrays.asList(new BinaryType(5), new VarBinaryType(2), new VarBinaryType(7)),
+					new VarBinaryType(7)
+				},
+
+				// two APPROXIMATE_NUMERIC types
+				{
+					Arrays.asList(new DoubleType(), new FloatType()),
+					new DoubleType()
+				},
+
+				// one APPROXIMATE_NUMERIC and one DECIMAL type
+				{
+					Arrays.asList(new DoubleType(), new DecimalType(2, 2)),
+					new DoubleType()
+				},
+
+				// one APPROXIMATE_NUMERIC and one EXACT_NUMERIC type
+				{
+					Arrays.asList(new DoubleType(), new IntType()),
+					new DoubleType()
+				},
+
+				// two APPROXIMATE_NUMERIC and one DECIMAL type
+				{
+					Arrays.asList(new DecimalType(2, 2), new DoubleType(), new FloatType()),
+					new DoubleType()
+				},
+
+				// DECIMAL precision and scale merging
+				{
+					Arrays.asList(new DecimalType(2, 2), new DecimalType(5, 2), new DecimalType(7, 5)),
+					new DecimalType(8, 5)
+				},
+
+				// DECIMAL precision and scale merging with other EXACT_NUMERIC types
+				{
+					Arrays.asList(new DecimalType(2, 2), new IntType(), new BigIntType()),
+					new DecimalType(21, 2)
+				},
+
+				// unsupported time merging
+				{
+					Arrays.asList(new DateType(), new DateType(), new TimeType()),
+					null
+				},
+
+				// time precision merging
+				{
+					Arrays.asList(new TimeType(3), new TimeType(5), new TimeType(2)),
+					new TimeType(5)
+				},
+
+				// timestamp precision merging
+				{
+					Arrays.asList(new TimestampType(3), new TimestampType(5), new TimestampType(2)),
+					new TimestampType(5)
+				},
+
+				// timestamp merging
+				{
+					Arrays.asList(new TimestampType(3), new ZonedTimestampType(5), new LocalZonedTimestampType(2)),
+					new ZonedTimestampType(5)
+				},
+
+				// timestamp merging
+				{
+					Arrays.asList(new TimestampType(3), new LocalZonedTimestampType(2)),
+					new LocalZonedTimestampType(3)
+				},
+
+				// day-time interval + DATETIME
+				{
+					Arrays.asList(new DayTimeIntervalType(DayTimeResolution.DAY), new DateType()),
+					new DateType()
+				},
+
+				// year-month interval + DATETIME
+				{
+					Arrays.asList(new YearMonthIntervalType(YearMonthResolution.MONTH), new DateType()),
+					new DateType()
+				},
+
+				// DATETIME + INTERVAL
+				{
+					Arrays.asList(new TimeType(), new DayTimeIntervalType(DayTimeResolution.MINUTE)),
+					new TimeType()
+				},
+
+				// EXACT_NUMERIC + DATE
+				{
+					Arrays.asList(new IntType(), new DateType()),
+					new DateType()
+				},
+
+				// TIME + EXACT_NUMERIC
+				{
+					Arrays.asList(new TimeType(), new DecimalType()),
+					null
+				},
+
+				// TIMESTAMP + EXACT_NUMERIC
+				{
+					Arrays.asList(new TimestampType(), new DecimalType()),
+					new TimestampType()
+				},
+
+				// day-time intervals
+				{
+					Arrays.asList(
+						new DayTimeIntervalType(DayTimeResolution.DAY_TO_MINUTE),
+						new DayTimeIntervalType(DayTimeResolution.SECOND)),
+					new DayTimeIntervalType(DayTimeResolution.DAY_TO_SECOND)
+				},
+
+				// day-time intervals
+				{
+					Arrays.asList(
+						new DayTimeIntervalType(DayTimeResolution.HOUR),
+						new DayTimeIntervalType(
+							DayTimeResolution.SECOND,
+							DayTimeIntervalType.DEFAULT_DAY_PRECISION,
+							0)),
+					new DayTimeIntervalType(
+						DayTimeResolution.HOUR_TO_SECOND,
+						DayTimeIntervalType.DEFAULT_DAY_PRECISION,
+						6)
+				},
+
+				// year-month intervals
+				{
+					Arrays.asList(
+						new YearMonthIntervalType(YearMonthResolution.MONTH),
+						new YearMonthIntervalType(YearMonthResolution.YEAR)),
+					new YearMonthIntervalType(YearMonthResolution.YEAR_TO_MONTH)
+				},
+
+			}
+		);
+	}
+
+	@Parameter
+	public List<LogicalType> types;
+
+	@Parameter(1)
+	public LogicalType commonType;
+
+	@Test
+	public void testCommonType() {
+		assertThat(
+			LogicalTypeGeneralization.findCommonType(types),
+			equalTo(Optional.ofNullable(commonType)));
+	}
+}