You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/05/23 08:01:31 UTC

[flink] 06/06: [FLINK-12254][table-common] Add a converter between old type information behavior and data type

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 185ed0e2350b132bbec20b54e6cae6fe72710eae
Author: Timo Walther <tw...@apache.org>
AuthorDate: Tue May 21 15:45:47 2019 +0200

    [FLINK-12254][table-common] Add a converter between old type information behavior and data type
---
 .../types/logical/LegacyTypeInformationType.java   | 120 +++++++
 .../utils/LegacyTypeInfoDataTypeConverter.java     | 352 +++++++++++++++++++++
 .../flink/table/types/utils/TypeConversions.java   |  85 +++++
 .../types/LegacyTypeInfoDataTypeConverterTest.java | 147 +++++++++
 4 files changed, 704 insertions(+)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LegacyTypeInformationType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LegacyTypeInformationType.java
new file mode 100644
index 0000000..a6d6a20
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LegacyTypeInformationType.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * This type is a temporary solution to fully support the old type system stack through the new
+ * stack. Many types can be mapped directly to the new type system, however, some types such as
+ * {@code DECIMAL}, POJOs, or case classes need special handling.
+ *
+ * <p>This type differs from {@link TypeInformationAnyType}. This type is allowed to travel through
+ * the stack whereas {@link TypeInformationAnyType} should be resolved eagerly to {@link AnyType} by
+ * the planner.
+ *
+ * <p>This class can be removed once we have removed all deprecated methods that take or return
+ * {@link TypeInformation}.
+ *
+ * @see LegacyTypeInfoDataTypeConverter
+ */
+@Internal
+public final class LegacyTypeInformationType<T> extends LogicalType {
+
+	private static final String FORMAT = "LEGACY(%s)";
+
+	private final TypeInformation<T> typeInfo;
+
+	public LegacyTypeInformationType(LogicalTypeRoot logicalTypeRoot, TypeInformation<T> typeInfo) {
+		super(true, logicalTypeRoot);
+		this.typeInfo = Preconditions.checkNotNull(typeInfo, "Type information must not be null.");
+	}
+
+	public TypeInformation<T> getTypeInformation() {
+		return typeInfo;
+	}
+
+	@Override
+	public LogicalType copy(boolean isNullable) {
+		return new LegacyTypeInformationType<>(getTypeRoot(), typeInfo);
+	}
+
+	@Override
+	public String asSerializableString() {
+		throw new TableException("Legacy type information has no serializable string representation.");
+	}
+
+	@Override
+	public String asSummaryString() {
+		return withNullability(FORMAT, typeInfo);
+	}
+
+	@Override
+	public boolean supportsInputConversion(Class<?> clazz) {
+		return typeInfo.getTypeClass().isAssignableFrom(clazz);
+	}
+
+	@Override
+	public boolean supportsOutputConversion(Class<?> clazz) {
+		return clazz.isAssignableFrom(typeInfo.getTypeClass());
+	}
+
+	@Override
+	public Class<?> getDefaultConversion() {
+		return typeInfo.getTypeClass();
+	}
+
+	@Override
+	public List<LogicalType> getChildren() {
+		return Collections.emptyList();
+	}
+
+	@Override
+	public <R> R accept(LogicalTypeVisitor<R> visitor) {
+		return visitor.visit(this);
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		if (!super.equals(o)) {
+			return false;
+		}
+		LegacyTypeInformationType<?> that = (LegacyTypeInformationType<?>) o;
+		return typeInfo.equals(that.typeInfo);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(super.hashCode(), typeInfo);
+	}
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/LegacyTypeInfoDataTypeConverter.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/LegacyTypeInfoDataTypeConverter.java
new file mode 100644
index 0000000..5c31be1
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/LegacyTypeInfoDataTypeConverter.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.MultisetTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.FieldsDataType;
+import org.apache.flink.table.types.KeyValueDataType;
+import org.apache.flink.table.types.logical.LegacyTypeInformationType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TypeInformationAnyType;
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isRowtimeAttribute;
+
+/**
+ * Converter between {@link TypeInformation} and {@link DataType} that reflects the behavior before
+ * Flink 1.9. The conversion is a 1:1 mapping that allows back-and-forth conversion.
+ *
+ * <p>This converter only exists to still support deprecated methods that take or return {@link TypeInformation}.
+ * Some methods will still support type information in the future, however, the future type information
+ * support will integrate nicer with the new type stack. This converter reflects the old behavior that includes:
+ *
+ * <p>Use old {@code java.sql.*} time classes for time data types.
+ *
+ * <p>Only support millisecond precision for timestamps or day-time intervals.
+ *
+ * <p>Do not support fractional seconds for the time type.
+ *
+ * <p>Let variable precision and scale for decimal types pass through the planner.
+ *
+ * <p>Let POJOs, case classes, and tuples pass through the planner.
+ *
+ * <p>Inconsistent nullability. Most types are nullable even though type information does not support it.
+ *
+ * <p>Distinction between {@link BasicArrayTypeInfo} and {@link ObjectArrayTypeInfo}.
+ */
+@Internal
+public final class LegacyTypeInfoDataTypeConverter {
+
+	private static final Map<TypeInformation<?>, DataType> typeInfoDataTypeMap = new HashMap<>();
+	private static final Map<DataType, TypeInformation<?>> dataTypeTypeInfoMap = new HashMap<>();
+	static {
+		addMapping(Types.STRING, DataTypes.STRING().bridgedTo(String.class));
+		addMapping(Types.BOOLEAN, DataTypes.BOOLEAN().bridgedTo(Boolean.class));
+		addMapping(Types.BYTE, DataTypes.TINYINT().bridgedTo(Byte.class));
+		addMapping(Types.SHORT, DataTypes.SMALLINT().bridgedTo(Short.class));
+		addMapping(Types.INT, DataTypes.INT().bridgedTo(Integer.class));
+		addMapping(Types.LONG, DataTypes.BIGINT().bridgedTo(Long.class));
+		addMapping(Types.FLOAT, DataTypes.FLOAT().bridgedTo(Float.class));
+		addMapping(Types.DOUBLE, DataTypes.DOUBLE().bridgedTo(Double.class));
+		addMapping(Types.BIG_DEC, createLegacyType(LogicalTypeRoot.DECIMAL, Types.BIG_DEC));
+		addMapping(Types.SQL_DATE, DataTypes.DATE().bridgedTo(java.sql.Date.class));
+		addMapping(Types.SQL_TIME, DataTypes.TIME(0).bridgedTo(java.sql.Time.class));
+		addMapping(Types.SQL_TIMESTAMP, DataTypes.TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class));
+		addMapping(
+			TimeIntervalTypeInfo.INTERVAL_MONTHS,
+			DataTypes.INTERVAL(DataTypes.MONTH()).bridgedTo(Integer.class));
+		addMapping(
+			TimeIntervalTypeInfo.INTERVAL_MILLIS,
+			DataTypes.INTERVAL(DataTypes.SECOND(3)).bridgedTo(Long.class));
+		addMapping(
+			PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO,
+			DataTypes.ARRAY(DataTypes.BOOLEAN().notNull().bridgedTo(boolean.class)).bridgedTo(boolean[].class));
+		addMapping(
+			PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO,
+			DataTypes.BYTES().bridgedTo(byte[].class));
+		addMapping(
+			PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO,
+			DataTypes.ARRAY(DataTypes.SMALLINT().notNull().bridgedTo(short.class)).bridgedTo(short[].class));
+		addMapping(
+			PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO,
+			DataTypes.ARRAY(DataTypes.INT().notNull().bridgedTo(int.class)).bridgedTo(int[].class));
+		addMapping(
+			PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO,
+			DataTypes.ARRAY(DataTypes.BIGINT().notNull().bridgedTo(long.class)).bridgedTo(long[].class));
+		addMapping(
+			PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO,
+			DataTypes.ARRAY(DataTypes.FLOAT().notNull().bridgedTo(float.class)).bridgedTo(float[].class));
+		addMapping(
+			PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO,
+			DataTypes.ARRAY(DataTypes.DOUBLE().notNull().bridgedTo(double.class)).bridgedTo(double[].class));
+	}
+
+	private static void addMapping(TypeInformation<?> typeInfo, DataType dataType) {
+		Preconditions.checkArgument(!typeInfoDataTypeMap.containsKey(typeInfo));
+		typeInfoDataTypeMap.put(typeInfo, dataType);
+		dataTypeTypeInfoMap.put(dataType, typeInfo);
+	}
+
+	public static DataType toDataType(TypeInformation<?> typeInfo) {
+		// time indicators first as their hashCode/equals is shared with those of regular timestamps
+		if (typeInfo instanceof TimeIndicatorTypeInfo) {
+			return convertToTimeAttributeType((TimeIndicatorTypeInfo) typeInfo);
+		}
+
+		final DataType foundDataType = typeInfoDataTypeMap.get(typeInfo);
+		if (foundDataType != null) {
+			return foundDataType;
+		}
+
+		if (typeInfo instanceof RowTypeInfo) {
+			return convertToRowType((RowTypeInfo) typeInfo);
+		}
+
+		else if (typeInfo instanceof ObjectArrayTypeInfo) {
+			return convertToArrayType(
+				typeInfo.getTypeClass(),
+				((ObjectArrayTypeInfo) typeInfo).getComponentInfo());
+		}
+
+		else if (typeInfo instanceof BasicArrayTypeInfo) {
+			return createLegacyType(LogicalTypeRoot.ARRAY, typeInfo);
+		}
+
+		else if (typeInfo instanceof MultisetTypeInfo) {
+			return convertToMultisetType(((MultisetTypeInfo) typeInfo).getElementTypeInfo());
+		}
+
+		else if (typeInfo instanceof MapTypeInfo) {
+			return convertToMapType((MapTypeInfo) typeInfo);
+		}
+
+		else if (typeInfo instanceof CompositeType) {
+			return createLegacyType(LogicalTypeRoot.STRUCTURED_TYPE, typeInfo);
+		}
+
+		return createLegacyType(LogicalTypeRoot.ANY, typeInfo);
+	}
+
+	public static TypeInformation<?> toLegacyTypeInfo(DataType dataType) {
+		// time indicators first as their hashCode/equals is shared with those of regular timestamps
+		if (canConvertToTimeAttributeTypeInfo(dataType)) {
+			return convertToTimeAttributeTypeInfo((TimestampType) dataType.getLogicalType());
+		}
+
+		final TypeInformation<?> foundTypeInfo = dataTypeTypeInfoMap.get(dataType);
+		if (foundTypeInfo != null) {
+			return foundTypeInfo;
+		}
+
+		if (canConvertToLegacyTypeInfo(dataType)) {
+			return convertToLegacyTypeInfo(dataType);
+		}
+
+		else if (canConvertToRowTypeInfo(dataType)) {
+			return convertToRowTypeInfo((FieldsDataType) dataType);
+		}
+
+		// this could also match for basic array type info but this is covered by legacy type info
+		else if (canConvertToObjectArrayTypeInfo(dataType)) {
+			return convertToObjectArrayTypeInfo((CollectionDataType) dataType);
+		}
+
+		else if (canConvertToMultisetTypeInfo(dataType)) {
+			return convertToMultisetTypeInfo((CollectionDataType) dataType);
+		}
+
+		else if (canConvertToMapTypeInfo(dataType)) {
+			return convertToMapTypeInfo((KeyValueDataType) dataType);
+		}
+
+		// makes the any type accessible in the legacy planner
+		else if (canConvertToAnyTypeInfo(dataType)) {
+			return convertToAnyTypeInfo(dataType);
+		}
+
+		throw new TableException(
+			String.format(
+				"Unsupported conversion from data type '%s' to type information. Only data types " +
+					"that originated from type information fully support a reverse conversion.",
+				dataType));
+	}
+
+	private static DataType createLegacyType(LogicalTypeRoot typeRoot, TypeInformation<?> typeInfo) {
+		return new AtomicDataType(new LegacyTypeInformationType<>(typeRoot, typeInfo))
+			.bridgedTo(typeInfo.getTypeClass());
+	}
+
+	private static DataType convertToTimeAttributeType(TimeIndicatorTypeInfo timeIndicatorTypeInfo) {
+		final TimestampKind kind;
+		if (timeIndicatorTypeInfo.isEventTime()) {
+			kind = TimestampKind.ROWTIME;
+		} else {
+			kind = TimestampKind.PROCTIME;
+		}
+		return new AtomicDataType(new TimestampType(true, kind, 3))
+			.bridgedTo(java.sql.Timestamp.class);
+	}
+
+	private static boolean canConvertToTimeAttributeTypeInfo(DataType dataType) {
+		return hasRoot(dataType.getLogicalType(), LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) &&
+			dataTypeTypeInfoMap.containsKey(dataType) && // checks precision and conversion
+			((TimestampType) dataType.getLogicalType()).getKind() != TimestampKind.REGULAR;
+	}
+
+	private static TypeInformation<?> convertToTimeAttributeTypeInfo(TimestampType timestampType) {
+		if (isRowtimeAttribute(timestampType)) {
+			return TimeIndicatorTypeInfo.ROWTIME_INDICATOR;
+		} else {
+			return TimeIndicatorTypeInfo.PROCTIME_INDICATOR;
+		}
+	}
+
+	private static DataType convertToRowType(RowTypeInfo rowTypeInfo) {
+		final String[] fieldNames = rowTypeInfo.getFieldNames();
+		final DataTypes.Field[] fields = IntStream.range(0, rowTypeInfo.getArity())
+			.mapToObj(i -> {
+				DataType fieldType = toDataType(rowTypeInfo.getTypeAt(i));
+
+				return DataTypes.FIELD(
+					fieldNames[i],
+					fieldType);
+			})
+			.toArray(DataTypes.Field[]::new);
+
+		return DataTypes.ROW(fields).bridgedTo(Row.class);
+	}
+
+	private static boolean canConvertToRowTypeInfo(DataType dataType) {
+		return hasRoot(dataType.getLogicalType(), LogicalTypeRoot.ROW) &&
+			dataType.getConversionClass().equals(Row.class) &&
+			((RowType) dataType.getLogicalType()).getFields().stream()
+				.noneMatch(f -> f.getDescription().isPresent());
+	}
+
+	private static TypeInformation<?> convertToRowTypeInfo(FieldsDataType fieldsDataType) {
+		final RowType rowType = (RowType) fieldsDataType.getLogicalType();
+
+		final String[] fieldNames = rowType.getFields()
+			.stream()
+			.map(RowType.RowField::getName)
+			.toArray(String[]::new);
+
+		final TypeInformation<?>[] fieldTypes = Stream.of(fieldNames)
+			.map(name -> fieldsDataType.getFieldDataTypes().get(name))
+			.map(LegacyTypeInfoDataTypeConverter::toLegacyTypeInfo)
+			.toArray(TypeInformation[]::new);
+
+		return Types.ROW_NAMED(fieldNames, fieldTypes);
+	}
+
+	private static DataType convertToArrayType(Class<?> arrayClass, TypeInformation<?> elementTypeInfo) {
+		return DataTypes.ARRAY(toDataType(elementTypeInfo)).bridgedTo(arrayClass);
+	}
+
+	private static boolean canConvertToObjectArrayTypeInfo(DataType dataType) {
+		return hasRoot(dataType.getLogicalType(), LogicalTypeRoot.ARRAY) &&
+			dataType.getConversionClass().isArray();
+	}
+
+	private static TypeInformation<?> convertToObjectArrayTypeInfo(CollectionDataType collectionDataType) {
+		// Types.OBJECT_ARRAY would return a basic type info for strings
+		return ObjectArrayTypeInfo.getInfoFor(
+			toLegacyTypeInfo(collectionDataType.getElementDataType()));
+	}
+
+	private static DataType convertToMultisetType(TypeInformation elementTypeInfo) {
+		return DataTypes.MULTISET(toDataType(elementTypeInfo)).bridgedTo(Map.class);
+	}
+
+	private static boolean canConvertToMultisetTypeInfo(DataType dataType) {
+		return hasRoot(dataType.getLogicalType(), LogicalTypeRoot.MULTISET) &&
+			dataType.getConversionClass() == Map.class;
+	}
+
+	private static TypeInformation<?> convertToMultisetTypeInfo(CollectionDataType collectionDataType) {
+		return new MultisetTypeInfo<>(
+			toLegacyTypeInfo(collectionDataType.getElementDataType()));
+	}
+
+	private static DataType convertToMapType(MapTypeInfo typeInfo) {
+		return DataTypes.MAP(
+				toDataType(typeInfo.getKeyTypeInfo()),
+				toDataType(typeInfo.getValueTypeInfo()))
+			.bridgedTo(Map.class);
+	}
+
+	private static boolean canConvertToMapTypeInfo(DataType dataType) {
+		return hasRoot(dataType.getLogicalType(), LogicalTypeRoot.MAP) &&
+			dataType.getConversionClass() == Map.class;
+	}
+
+	private static TypeInformation<?> convertToMapTypeInfo(KeyValueDataType dataType) {
+		return Types.MAP(
+			toLegacyTypeInfo(dataType.getKeyDataType()),
+			toLegacyTypeInfo(dataType.getValueDataType()));
+	}
+
+	private static boolean canConvertToLegacyTypeInfo(DataType dataType) {
+		return dataType.getLogicalType() instanceof LegacyTypeInformationType;
+	}
+
+	private static TypeInformation<?> convertToLegacyTypeInfo(DataType dataType) {
+		return ((LegacyTypeInformationType) dataType.getLogicalType()).getTypeInformation();
+	}
+
+	private static boolean canConvertToAnyTypeInfo(DataType dataType) {
+		return dataType.getLogicalType() instanceof TypeInformationAnyType &&
+			dataType.getConversionClass().equals(
+				((TypeInformationAnyType) dataType.getLogicalType()).getTypeInformation().getTypeClass());
+	}
+
+	private static TypeInformation<?> convertToAnyTypeInfo(DataType dataType) {
+		return ((TypeInformationAnyType) dataType.getLogicalType()).getTypeInformation();
+	}
+
+	private LegacyTypeInfoDataTypeConverter() {
+		// no instantiation
+	}
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/TypeConversions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/TypeConversions.java
new file mode 100644
index 0000000..a5c978f
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/TypeConversions.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.util.Optional;
+import java.util.stream.Stream;
+
+/**
+ * Conversion hub for interoperability of {@link Class}, {@link TypeInformation}, {@link DataType},
+ * and {@link LogicalType}.
+ *
+ * <p>See the corresponding converter classes for more information about how the conversion is performed.
+ */
+@Internal
+public final class TypeConversions {
+
+	public static DataType fromLegacyInfoToDataType(TypeInformation<?> typeInfo) {
+		return LegacyTypeInfoDataTypeConverter.toDataType(typeInfo);
+	}
+
+	public static DataType[] fromLegacyInfoToDataType(TypeInformation<?>[] typeInfo) {
+		return Stream.of(typeInfo)
+			.map(TypeConversions::fromLegacyInfoToDataType)
+			.toArray(DataType[]::new);
+	}
+
+	public static TypeInformation<?> fromDataTypeToLegacyInfo(DataType dataType) {
+		return LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(dataType);
+	}
+
+	public static TypeInformation<?>[] fromDataTypeToLegacyInfo(DataType[] dataType) {
+		return Stream.of(dataType)
+			.map(TypeConversions::fromDataTypeToLegacyInfo)
+			.toArray(TypeInformation[]::new);
+	}
+
+	public static Optional<DataType> fromClassToDataType(Class<?> clazz) {
+		return ClassDataTypeConverter.extractDataType(clazz);
+	}
+
+	public static DataType fromLogicalToDataType(LogicalType logicalType) {
+		return LogicalTypeDataTypeConverter.toDataType(logicalType);
+	}
+
+	public static DataType[] fromLogicalToDataType(LogicalType[] logicalTypes) {
+		return Stream.of(logicalTypes)
+			.map(LogicalTypeDataTypeConverter::toDataType)
+			.toArray(DataType[]::new);
+	}
+
+	public static LogicalType fromDataToLogicalType(DataType dataType) {
+		return dataType.getLogicalType();
+	}
+
+	public static LogicalType[] fromDataToLogicalType(DataType[] dataTypes) {
+		return Stream.of(dataTypes)
+			.map(TypeConversions::fromDataToLogicalType)
+			.toArray(LogicalType[]::new);
+	}
+
+	private TypeConversions() {
+		// no instance
+	}
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LegacyTypeInfoDataTypeConverterTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LegacyTypeInfoDataTypeConverterTest.java
new file mode 100644
index 0000000..fbce34e
--- /dev/null
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LegacyTypeInfoDataTypeConverterTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.logical.LegacyTypeInformationType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter;
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link LegacyTypeInfoDataTypeConverter}.
+ */
+@RunWith(Parameterized.class)
+public class LegacyTypeInfoDataTypeConverterTest {
+
+	@Parameters(name = "[{index}] type info: {0} data type: {1}")
+	public static List<Object[]> typeInfo() {
+		return Arrays.asList(
+			new Object[][]{
+				{Types.STRING, DataTypes.STRING()},
+
+				{Types.BOOLEAN, DataTypes.BOOLEAN()},
+
+				{Types.SQL_TIMESTAMP, DataTypes.TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class)},
+
+				{
+					Types.GENERIC(LegacyTypeInfoDataTypeConverterTest.class),
+					new AtomicDataType(
+						new LegacyTypeInformationType<>(
+							LogicalTypeRoot.ANY,
+							Types.GENERIC(LegacyTypeInfoDataTypeConverterTest.class)))
+				},
+
+				{
+					Types.ROW_NAMED(new String[] {"field1", "field2"}, Types.INT, Types.LONG),
+					DataTypes.ROW(
+							FIELD("field1", DataTypes.INT()),
+							FIELD("field2", DataTypes.BIGINT()))
+				},
+
+				{
+					Types.MAP(Types.FLOAT, Types.ROW(Types.BYTE)),
+					DataTypes.MAP(DataTypes.FLOAT(), DataTypes.ROW(FIELD("f0", DataTypes.TINYINT())))
+				},
+
+				{
+					Types.PRIMITIVE_ARRAY(Types.FLOAT),
+					DataTypes.ARRAY(DataTypes.FLOAT().notNull().bridgedTo(float.class))
+						.bridgedTo(float[].class)
+				},
+
+				{
+					Types.PRIMITIVE_ARRAY(Types.BYTE),
+					DataTypes.BYTES()
+				},
+
+				{
+					Types.OBJECT_ARRAY(Types.PRIMITIVE_ARRAY(Types.FLOAT)),
+					DataTypes.ARRAY(
+						DataTypes.ARRAY(DataTypes.FLOAT().notNull().bridgedTo(float.class))
+							.bridgedTo(float[].class))
+						.bridgedTo(float[][].class)
+				},
+
+				{
+					BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO,
+					new AtomicDataType(
+						new LegacyTypeInformationType<>(
+							LogicalTypeRoot.ARRAY,
+							BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO))
+				},
+
+				{
+					ObjectArrayTypeInfo.getInfoFor(Types.STRING),
+					DataTypes.ARRAY(DataTypes.STRING())
+						.bridgedTo(String[].class)
+				},
+
+				{
+					Types.TUPLE(Types.SHORT, Types.DOUBLE, Types.FLOAT),
+					new AtomicDataType(
+						new LegacyTypeInformationType<>(
+							LogicalTypeRoot.STRUCTURED_TYPE,
+							Types.TUPLE(Types.SHORT, Types.DOUBLE, Types.FLOAT)))
+				},
+
+				{
+					TimeIndicatorTypeInfo.ROWTIME_INDICATOR,
+					new AtomicDataType(new TimestampType(true, TimestampKind.ROWTIME, 3))
+						.bridgedTo(java.sql.Timestamp.class)
+				}
+			}
+		);
+	}
+
+	@Parameter
+	public TypeInformation<?> inputTypeInfo;
+
+	@Parameter(1)
+	public DataType dataType;
+
+	@Test
+	public void testTypeInfoToDataTypeConversion() {
+		assertThat(LegacyTypeInfoDataTypeConverter.toDataType(inputTypeInfo), equalTo(dataType));
+	}
+
+	@Test
+	public void testDataTypeToTypeInfoConversion() {
+		assertThat(LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(dataType), equalTo(inputTypeInfo));
+	}
+}