You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/05/09 12:36:24 UTC

[flink] 02/06: [FLINK-12393][table-common] Add the user-facing classes of the new type system

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1cb9d680cbbc46e67de10acc99c5fb510b9382e5
Author: Timo Walther <tw...@apache.org>
AuthorDate: Fri May 3 13:52:23 2019 +0200

    [FLINK-12393][table-common] Add the user-facing classes of the new type system
    
    Introduces the DataType class and subclasses. Users are able to do a star import
    of DataTypes and declare types like: `MULTISET(MULTISET(INT())`. Close to SQL. As
    mentioned in FLIP-37, data types allow to specify format hints to the planner
    using `TIMESTAMP(9).bridgedTo(java.sql.Timestamp)`.
    
    This closes #8360.
---
 .../java/org/apache/flink/table/api/DataTypes.java | 740 +++++++++++++++++++++
 .../apache/flink/table/types/AtomicDataType.java   |  64 ++
 .../flink/table/types/CollectionDataType.java      | 113 ++++
 .../org/apache/flink/table/types/DataType.java     | 163 +++++
 .../apache/flink/table/types/FieldsDataType.java   | 106 +++
 .../apache/flink/table/types/KeyValueDataType.java | 113 ++++
 .../org/apache/flink/table/types/DataTypeTest.java | 131 ++++
 .../apache/flink/table/types/DataTypesTest.java    | 211 ++++++
 .../apache/flink/table/types/TypeTestingUtils.java |  70 ++
 9 files changed, 1711 insertions(+)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java
new file mode 100644
index 0000000..2591227
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java
@@ -0,0 +1,740 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.FieldsDataType;
+import org.apache.flink.table.types.KeyValueDataType;
+import org.apache.flink.table.types.logical.AnyType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.TypeInformationAnyType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType.YearMonthResolution;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A {@link DataType} can be used to declare input and/or output types of operations. This class
+ * enumerates all supported data types of the Table & SQL API.
+ */
+@PublicEvolving
+public final class DataTypes {
+
+	// we use SQL-like naming for data types and avoid Java keyword clashes
+	// CHECKSTYLE.OFF: MethodName
+
+	/**
+	 * Data type of a fixed-length character string {@code CHAR(n)} where {@code n} is the number
+	 * of code points. {@code n} must have a value between 1 and 255 (both inclusive).
+	 *
+	 * @see CharType
+	 */
+	public static DataType CHAR(int n) {
+		return new AtomicDataType(new CharType(n));
+	}
+
+	/**
+	 * Data type of a variable-length character string {@code VARCHAR(n)} where {@code n} is the
+	 * maximum number of code points. {@code n} must have a value between 1 and {@link Integer#MAX_VALUE}
+	 * (both inclusive).
+	 *
+	 * @see VarCharType
+	 */
+	public static DataType VARCHAR(int n) {
+		return new AtomicDataType(new VarCharType(n));
+	}
+
+	/**
+	 * Data type of a variable-length character string with defined maximum length. This is a shortcut
+	 * for {@code VARCHAR(2147483647)} for representing JVM strings.
+	 *
+	 * @see VarCharType
+	 */
+	public static DataType STRING() {
+		return VARCHAR(Integer.MAX_VALUE);
+	}
+
+	/**
+	 * Data type of a boolean with a (possibly) three-valued logic of {@code TRUE, FALSE, UNKNOWN}.
+	 *
+	 * @see BooleanType
+	 */
+	public static DataType BOOLEAN() {
+		return new AtomicDataType(new BooleanType());
+	}
+
+	/**
+	 * Data type of a fixed-length binary string (=a sequence of bytes) {@code BINARY(n)} where
+	 * {@code n} is the number of bytes. {@code n} must have a value between 1 and {@link Integer#MAX_VALUE}
+	 * (both inclusive).
+	 *
+	 * @see BinaryType
+	 */
+	public static DataType BINARY(int n) {
+		return new AtomicDataType(new BinaryType(n));
+	}
+
+	/**
+	 * Data type of a variable-length binary string (=a sequence of bytes) {@code VARBINARY(n)} where
+	 * {@code n} is the maximum number of bytes. {@code n} must have a value between 1 and {@link Integer#MAX_VALUE}
+	 * (both inclusive).
+	 *
+	 * @see VarBinaryType
+	 */
+	public static DataType VARBINARY(int n) {
+		return new AtomicDataType(new VarBinaryType(n));
+	}
+
+	/**
+	 * Data type of a variable-length binary string (=a sequence of bytes) with defined maximum length.
+	 * This is a shortcut for {@code VARBINARY(2147483647)} for representing JVM byte arrays.
+	 *
+	 * @see VarBinaryType
+	 */
+	public static DataType BYTES() {
+		return VARBINARY(Integer.MAX_VALUE);
+	}
+
+	/**
+	 * Data type of a decimal number with fixed precision and scale {@code DECIMAL(p, s)} where {@code p}
+	 * is the number of digits in a number (=precision) and {@code s} is the number of digits to the
+	 * right of the decimal point in a number (=scale). {@code p} must have a value between 1 and 38
+	 * (both inclusive). {@code s} must have a value between 0 and {@code p} (both inclusive).
+	 *
+	 * @see DecimalType
+	 */
+	public static DataType DECIMAL(int precision, int scale) {
+		return new AtomicDataType(new DecimalType(precision, scale));
+	}
+
+	/**
+	 * Data type of a 1-byte signed integer with values from -128 to 127.
+	 *
+	 * @see TinyIntType
+	 */
+	public static DataType TINYINT() {
+		return new AtomicDataType(new TinyIntType());
+	}
+
+	/**
+	 * Data type of a 2-byte signed integer with values from -32,768 to 32,767.
+	 *
+	 * @see SmallIntType
+	 */
+	public static DataType SMALLINT() {
+		return new AtomicDataType(new SmallIntType());
+	}
+
+	/**
+	 * Data type of a 4-byte signed integer with values from -2,147,483,648 to 2,147,483,647.
+	 *
+	 * @see IntType
+	 */
+	public static DataType INT() {
+		return new AtomicDataType(new IntType());
+	}
+
+	/**
+	 * Data type of an 8-byte signed integer with values from -9,223,372,036,854,775,808 to
+	 * 9,223,372,036,854,775,807.
+	 *
+	 * @see BigIntType
+	 */
+	public static DataType BIGINT() {
+		return new AtomicDataType(new BigIntType());
+	}
+
+	/**
+	 * Data type of a 4-byte single precision floating point number.
+	 *
+	 * @see FloatType
+	 */
+	public static DataType FLOAT() {
+		return new AtomicDataType(new FloatType());
+	}
+
+	/**
+	 * Data type of an 8-byte double precision floating point number.
+	 *
+	 * @see DoubleType
+	 */
+	public static DataType DOUBLE() {
+		return new AtomicDataType(new DoubleType());
+	}
+
+	/**
+	 * Data type of a date consisting of {@code year-month-day} with values ranging from {@code 0000-01-01}
+	 * to {@code 9999-12-31}.
+	 *
+	 * <p>Compared to the SQL standard, the range starts at year {@code 0000}.
+	 *
+	 * @see DataType
+	 */
+	public static DataType DATE() {
+		return new AtomicDataType(new DateType());
+	}
+
+	/**
+	 * Data type of a time WITHOUT time zone {@code TIME(p)} where {@code p} is the number of digits
+	 * of fractional seconds (=precision). {@code p} must have a value between 0 and 9 (both inclusive).
+	 *
+	 * <p>An instance consists of {@code hour:minute:second[.fractional]} with up to nanosecond precision
+	 * and values ranging from {@code 00:00:00.000000000} to {@code 23:59:59.999999999}.
+	 *
+	 * <p>Compared to the SQL standard, leap seconds (23:59:60 and 23:59:61) are not supported as the
+	 * semantics are closer to {@link java.time.LocalTime}. A time WITH time zone is not provided.
+	 *
+	 * @see TimeType
+	 */
+	public static DataType TIME(int precision) {
+		return new AtomicDataType(new TimeType(precision));
+	}
+
+	/**
+	 * Data type of a timestamp WITHOUT time zone {@code TIMESTAMP(p)} where {@code p} is the number
+	 * of digits of fractional seconds (=precision). {@code p} must have a value between 0 and 9 (both
+	 * inclusive).
+	 *
+	 * <p>An instance consists of {@code year-month-day hour:minute:second[.fractional]} with up to
+	 * nanosecond precision and values ranging from {@code 0000-01-01 00:00:00.000000000} to
+	 * {@code 9999-12-31 23:59:59.999999999}.
+	 *
+	 * <p>Compared to the SQL standard, leap seconds (23:59:60 and 23:59:61) are not supported as the
+	 * semantics are closer to {@link java.time.LocalDateTime}.
+	 *
+	 * @see #TIMESTAMP_WITH_TIME_ZONE(int)
+	 * @see #TIMESTAMP_WITH_LOCAL_TIME_ZONE(int)
+	 * @see TimestampType
+	 */
+	public static DataType TIMESTAMP(int precision) {
+		return new AtomicDataType(new TimestampType(precision));
+	}
+
+	/**
+	 * Data type of a timestamp WITH time zone {@code TIMESTAMP(p) WITH TIME ZONE} where {@code p} is
+	 * the number of digits of fractional seconds (=precision). {@code p} must have a value between 0
+	 * and 9 (both inclusive).
+	 *
+	 * <p>An instance consists of {@code year-month-day hour:minute:second[.fractional] zone} with up
+	 * to nanosecond precision and values ranging from {@code 0000-01-01 00:00:00.000000000 +14:59} to
+	 * {@code 9999-12-31 23:59:59.999999999 -14:59}.
+	 *
+	 * <p>Compared to the SQL standard, leap seconds (23:59:60 and 23:59:61) are not supported as the
+	 * semantics are closer to {@link java.time.OffsetDateTime}.
+	 *
+	 * @see #TIMESTAMP(int)
+	 * @see #TIMESTAMP_WITH_LOCAL_TIME_ZONE(int)
+	 * @see ZonedTimestampType
+	 */
+	public static DataType TIMESTAMP_WITH_TIME_ZONE(int precision) {
+		return new AtomicDataType(new ZonedTimestampType(precision));
+	}
+
+	/**
+	 * Data type of a timestamp WITH LOCAL time zone {@code TIMESTAMP(p) WITH LOCAL TIME ZONE} where
+	 * {@code p} is the number of digits of fractional seconds (=precision). {@code p} must have a value
+	 * between 0 and 9 (both inclusive).
+	 *
+	 * <p>An instance consists of {@code year-month-day hour:minute:second[.fractional] zone} with up
+	 * to nanosecond precision and values ranging from {@code 0000-01-01 00:00:00.000000000 +14:59} to
+	 * {@code 9999-12-31 23:59:59.999999999 -14:59}. Leap seconds (23:59:60 and 23:59:61) are not supported
+	 * as the semantics are closer to {@link java.time.OffsetDateTime}.
+	 *
+	 * <p>Compared to {@link ZonedTimestampType}, the time zone offset information is not stored physically
+	 * in every datum. Instead, the type assumes {@link java.time.Instant} semantics in UTC time zone
+	 * at the edges of the table ecosystem. Every datum is interpreted in the local time zone configured
+	 * in the current session for computation and visualization.
+	 *
+	 * <p>This type fills the gap between time zone free and time zone mandatory timestamp types by
+	 * allowing the interpretation of UTC timestamps according to the configured session timezone.
+	 *
+	 * @see #TIMESTAMP(int)
+	 * @see #TIMESTAMP_WITH_TIME_ZONE(int)
+	 * @see LocalZonedTimestampType
+	 */
+	public static DataType TIMESTAMP_WITH_LOCAL_TIME_ZONE(int precision) {
+		return new AtomicDataType(new LocalZonedTimestampType(precision));
+	}
+
+	/**
+	 * Data type of a temporal interval. There are two types of temporal intervals: day-time intervals
+	 * with up to nanosecond granularity or year-month intervals with up to month granularity.
+	 *
+	 * <p>An interval of day-time consists of {@code +days hours:months:seconds.fractional} with values
+	 * ranging from {@code -999999 23:59:59.999999999} to {@code +999999 23:59:59.999999999}. The type
+	 * must be parameterized to one of the following resolutions: interval of days, interval of days to
+	 * hours, interval of days to minutes, interval of days to seconds, interval of hours, interval of
+	 * hours to minutes, interval of hours to seconds, interval of minutes, interval of minutes to seconds,
+	 * or interval of seconds. The value representation is the same for all types of resolutions. For
+	 * example, an interval of seconds of 70 is always represented in an interval-of-days-to-seconds
+	 * format (with default precisions): {@code +00 00:01:10.000000}).
+	 *
+	 * <p>An interval of year-month consists of {@code +years-months} with values ranging from {@code -9999-11}
+	 * to {@code +9999-11}. The type must be parameterized to one of the following resolutions: interval
+	 * of years, interval of years to months, or interval of months. The value representation is the
+	 * same for all types of resolutions. For example, an interval of months of 50 is always represented
+	 * in an interval-of-years-to-months format (with default year precision): {@code +04-02}.
+	 *
+	 * <p>Examples: {@code INTERVAL(DAY(2))} for a day-time interval or {@code INTERVAL(YEAR(4))} for
+	 * a year-month interval.
+	 *
+	 * @see DayTimeIntervalType
+	 * @see YearMonthIntervalType
+	 */
+	public static DataType INTERVAL(Resolution resolution) {
+		Preconditions.checkNotNull(resolution, "Interval resolution must not be null.");
+		return new AtomicDataType(Resolution.resolveInterval(resolution, null));
+	}
+
+	/**
+	 * Data type of a temporal interval. There are two types of temporal intervals: day-time intervals
+	 * with up to nanosecond granularity or year-month intervals with up to month granularity.
+	 *
+	 * <p>An interval of day-time consists of {@code +days hours:months:seconds.fractional} with values
+	 * ranging from {@code -999999 23:59:59.999999999} to {@code +999999 23:59:59.999999999}. The type
+	 * must be parameterized to one of the following resolutions: interval of days, interval of days to
+	 * hours, interval of days to minutes, interval of days to seconds, interval of hours, interval of
+	 * hours to minutes, interval of hours to seconds, interval of minutes, interval of minutes to seconds,
+	 * or interval of seconds. The value representation is the same for all types of resolutions. For
+	 * example, an interval of seconds of 70 is always represented in an interval-of-days-to-seconds
+	 * format (with default precisions): {@code +00 00:01:10.000000}).
+	 *
+	 * <p>An interval of year-month consists of {@code +years-months} with values ranging from {@code -9999-11}
+	 * to {@code +9999-11}. The type must be parameterized to one of the following resolutions: interval
+	 * of years, interval of years to months, or interval of months. The value representation is the
+	 * same for all types of resolutions. For example, an interval of months of 50 is always represented
+	 * in an interval-of-years-to-months format (with default year precision): {@code +04-02}.
+	 *
+	 * <p>Examples: {@code INTERVAL(DAY(2), SECOND(9))} for a day-time interval or {@code INTERVAL(YEAR(4), MONTH())}
+	 * for a year-month interval.
+	 *
+	 * @see DayTimeIntervalType
+	 * @see YearMonthIntervalType
+	 */
+	public static DataType INTERVAL(Resolution upperResolution, Resolution lowerResolution) {
+		Preconditions.checkNotNull(upperResolution, "Upper interval resolution must not be null.");
+		Preconditions.checkNotNull(lowerResolution, "Lower interval resolution must not be null.");
+		return new AtomicDataType(Resolution.resolveInterval(upperResolution, lowerResolution));
+	}
+
+	/**
+	 * Data type of an array of elements with same subtype.
+	 *
+	 * <p>Compared to the SQL standard, the maximum cardinality of an array cannot be specified but
+	 * is fixed at {@link Integer#MAX_VALUE}. Also, any valid type is supported as a subtype.
+	 *
+	 * @see ArrayType
+	 */
+	public static DataType ARRAY(DataType elementDataType) {
+		Preconditions.checkNotNull(elementDataType, "Element data type must not be null.");
+		return new CollectionDataType(new ArrayType(elementDataType.getLogicalType()), elementDataType);
+	}
+
+	/**
+	 * Data type of a multiset (=bag). Unlike a set, it allows for multiple instances for each of its
+	 * elements with a common subtype. Each unique value (including {@code NULL}) is mapped to some
+	 * multiplicity.
+	 *
+	 * <p>There is no restriction of element types; it is the responsibility of the user to ensure
+	 * uniqueness.
+	 *
+	 * @see MultisetType
+	 */
+	public static DataType MULTISET(DataType elementDataType) {
+		Preconditions.checkNotNull(elementDataType, "Element data type must not be null.");
+		return new CollectionDataType(new MultisetType(elementDataType.getLogicalType()), elementDataType);
+	}
+
+	/**
+	 * Data type of an associative array that maps keys (including {@code NULL}) to values (including
+	 * {@code NULL}). A map cannot contain duplicate keys; each key can map to at most one value.
+	 *
+	 * <p>There is no restriction of key types; it is the responsibility of the user to ensure uniqueness.
+	 * The map type is an extension to the SQL standard.
+	 *
+	 * @see MapType
+	 */
+	public static DataType MAP(DataType keyDataType, DataType valueDataType) {
+		Preconditions.checkNotNull(keyDataType, "Key data type must not be null.");
+		Preconditions.checkNotNull(valueDataType, "Value data type must not be null.");
+		return new KeyValueDataType(
+			new MapType(keyDataType.getLogicalType(), valueDataType.getLogicalType()),
+			keyDataType,
+			valueDataType);
+	}
+
+	/**
+	 * Data type of a sequence of fields. A field consists of a field name, field type, and an optional
+	 * description. The most specific type of a row of a table is a row type. In this case, each column
+	 * of the row corresponds to the field of the row type that has the same ordinal position as the
+	 * column.
+	 *
+	 * <p>Compared to the SQL standard, an optional field description simplifies the handling with
+	 * complex structures.
+	 *
+	 * @see RowType
+	 */
+	public static DataType ROW(Field... fields) {
+		final List<RowType.RowField> logicalFields = Stream.of(fields)
+			.map(f -> Preconditions.checkNotNull(f, "Field definition must not be null."))
+			.map(f -> new RowType.RowField(f.name, f.dataType.getLogicalType(), f.description))
+			.collect(Collectors.toList());
+		final Map<String, DataType> fieldDataTypes = Stream.of(fields)
+			.collect(Collectors.toMap(f -> f.name, f -> f.dataType));
+		return new FieldsDataType(new RowType(logicalFields), fieldDataTypes);
+	}
+
+	/**
+	 * Data type for representing untyped {@code NULL} values. A null type has no other value except
+	 * {@code NULL}, thus, it can be cast to any nullable type similar to JVM semantics.
+	 *
+	 * <p>This type helps in representing unknown types in API calls that use a {@code NULL} literal
+	 * as well as bridging to formats such as JSON or Avro that define such a type as well.
+	 *
+	 * <p>The null type is an extension to the SQL standard.
+	 *
+	 * @see NullType
+	 */
+	public static DataType NULL() {
+		return new AtomicDataType(new NullType());
+	}
+
+	/**
+	 * Data type of an arbitrary serialized type. This type is a black box within the table ecosystem
+	 * and is only deserialized at the edges.
+	 *
+	 * <p>The any type is an extension to the SQL standard.
+	 *
+	 * <p>This method assumes that a {@link TypeSerializer} instance is present. Use {@link #ANY(TypeInformation)}
+	 * for generating a serializer from Flink's core type system automatically in subsequent layers.
+	 *
+	 * @param clazz originating value class
+	 * @param serializer type serializer
+	 *
+	 * @see AnyType
+	 */
+	public static <T> DataType ANY(Class<T> clazz, TypeSerializer<T> serializer) {
+		return new AtomicDataType(new AnyType<>(clazz, serializer));
+	}
+
+	/**
+	 * Data type of an arbitrary serialized type backed by {@link TypeInformation}. This type is
+	 * a black box within the table ecosystem and is only deserialized at the edges.
+	 *
+	 * <p>The any type is an extension to the SQL standard.
+	 *
+	 * <p>Compared to an {@link #ANY(Class, TypeSerializer)}, this type does not contain a {@link TypeSerializer}
+	 * yet. The serializer will be generated from the enclosed {@link TypeInformation} but needs access
+	 * to the {@link ExecutionConfig} of the current execution environment. Thus, this type is just a
+	 * placeholder.
+	 *
+	 * @see TypeInformationAnyType
+	 */
+	public static <T> DataType ANY(TypeInformation<T> typeInformation) {
+		return new AtomicDataType(new TypeInformationAnyType<>(typeInformation));
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Helper functions
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Resolution in seconds with 6 digits for fractional seconds by default.
+	 *
+	 * @see #SECOND(int)
+	 */
+	public static Resolution SECOND() {
+		return new Resolution(Resolution.IntervalUnit.SECOND, DayTimeIntervalType.DEFAULT_FRACTIONAL_PRECISION);
+	}
+
+	/**
+	 * Resolution in seconds and (possibly) fractional seconds. The precision is the number of
+	 * digits of fractional seconds. It must have a value between 0 and 9 (both inclusive). If
+	 * no fractional is specified, it is equal to 6 by default.
+	 *
+	 * @see #SECOND()
+	 */
+	public static Resolution SECOND(int precision) {
+		return new Resolution(Resolution.IntervalUnit.SECOND, precision);
+	}
+
+	/**
+	 * Resolution in minutes.
+	 */
+	public static Resolution MINUTE() {
+		return new Resolution(Resolution.IntervalUnit.MINUTE);
+	}
+
+	/**
+	 * Resolution in hours.
+	 */
+	public static Resolution HOUR() {
+		return new Resolution(Resolution.IntervalUnit.HOUR);
+	}
+
+	/**
+	 * Resolution in days. The precision is the number of digits of days. It must have a value
+	 * between 1 and 6 (both inclusive). If no precision is specified, it is equal to 2 by default.
+	 *
+	 * @see #DAY()
+	 */
+	public static Resolution DAY(int precision) {
+		return new Resolution(Resolution.IntervalUnit.DAY, precision);
+	}
+
+	/**
+	 * Resolution in days with 2 digits for the number of days by default.
+	 *
+	 * @see #DAY(int)
+	 */
+	public static Resolution DAY() {
+		return new Resolution(Resolution.IntervalUnit.DAY, DayTimeIntervalType.DEFAULT_DAY_PRECISION);
+	}
+
+	/**
+	 * Resolution in months.
+	 */
+	public static Resolution MONTH() {
+		return new Resolution(Resolution.IntervalUnit.MONTH);
+	}
+
+	/**
+	 * Resolution in years. The precision is the number of digits of years. It must have a value
+	 * between 1 and 4 (both inclusive). If no precision is specified, it is equal to 2.
+	 *
+	 * @see #YEAR()
+	 */
+	public static Resolution YEAR(int precision) {
+		return new Resolution(Resolution.IntervalUnit.YEAR, precision);
+	}
+
+	/**
+	 * Resolution in years with 2 digits for the number of years by default.
+	 *
+	 * @see #YEAR(int)
+	 */
+	public static Resolution YEAR() {
+		return new Resolution(Resolution.IntervalUnit.YEAR, YearMonthIntervalType.DEFAULT_PRECISION);
+	}
+
+	/**
+	 * Field definition with field name and data type.
+	 */
+	public static Field FIELD(String name, DataType dataType) {
+		return new Field(
+			Preconditions.checkNotNull(name, "Field name must not be null."),
+			Preconditions.checkNotNull(dataType, "Field data type must not be null."),
+			null);
+	}
+
+	/**
+	 * Field definition with field name, data type, and a description.
+	 */
+	public static Field FIELD(String name, DataType dataType, String description) {
+		return new Field(
+			Preconditions.checkNotNull(name, "Field name must not be null."),
+			Preconditions.checkNotNull(dataType, "Field data type must not be null."),
+			Preconditions.checkNotNull(description, "Field description must not be null."));
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Helper classes
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Helper class for defining the resolution of an interval.
+	 */
+	public static final class Resolution {
+
+		private static final int EMPTY_PRECISION = -1;
+
+		private enum IntervalUnit {
+			SECOND,
+			MINUTE,
+			HOUR,
+			DAY,
+			MONTH,
+			YEAR
+		}
+
+		private static final Map<List<IntervalUnit>, BiFunction<Integer, Integer, LogicalType>> resolutionMapping = new HashMap<>();
+		static {
+			addResolutionMapping(
+				IntervalUnit.YEAR, null,
+				(p1, p2) -> new YearMonthIntervalType(YearMonthResolution.YEAR, p1));
+			addResolutionMapping(
+				IntervalUnit.MONTH, null,
+				(p1, p2) -> new YearMonthIntervalType(YearMonthResolution.MONTH));
+			addResolutionMapping(
+				IntervalUnit.YEAR, IntervalUnit.MONTH,
+				(p1, p2) -> new YearMonthIntervalType(YearMonthResolution.YEAR_TO_MONTH, p1));
+			addResolutionMapping(
+				IntervalUnit.DAY, null,
+				(p1, p2) -> new DayTimeIntervalType(DayTimeResolution.DAY, p1, DayTimeIntervalType.DEFAULT_FRACTIONAL_PRECISION));
+			addResolutionMapping(
+				IntervalUnit.DAY, IntervalUnit.HOUR,
+				(p1, p2) -> new DayTimeIntervalType(DayTimeResolution.DAY_TO_HOUR, p1, DayTimeIntervalType.DEFAULT_FRACTIONAL_PRECISION));
+			addResolutionMapping(
+				IntervalUnit.DAY, IntervalUnit.MINUTE,
+				(p1, p2) -> new DayTimeIntervalType(DayTimeResolution.DAY_TO_MINUTE, p1, DayTimeIntervalType.DEFAULT_FRACTIONAL_PRECISION));
+			addResolutionMapping(
+				IntervalUnit.DAY, IntervalUnit.SECOND,
+				(p1, p2) -> new DayTimeIntervalType(DayTimeResolution.DAY_TO_SECOND, p1, p2));
+			addResolutionMapping(
+				IntervalUnit.HOUR, null,
+				(p1, p2) -> new DayTimeIntervalType(DayTimeResolution.HOUR));
+			addResolutionMapping(
+				IntervalUnit.HOUR, IntervalUnit.MINUTE,
+				(p1, p2) -> new DayTimeIntervalType(DayTimeResolution.HOUR_TO_MINUTE));
+			addResolutionMapping(
+				IntervalUnit.HOUR, IntervalUnit.SECOND,
+				(p1, p2) -> new DayTimeIntervalType(DayTimeResolution.HOUR_TO_SECOND, DayTimeIntervalType.DEFAULT_DAY_PRECISION, p2));
+			addResolutionMapping(
+				IntervalUnit.MINUTE, null,
+				(p1, p2) -> new DayTimeIntervalType(DayTimeResolution.MINUTE));
+			addResolutionMapping(
+				IntervalUnit.MINUTE, IntervalUnit.SECOND,
+				(p1, p2) -> new DayTimeIntervalType(DayTimeResolution.MINUTE_TO_SECOND, DayTimeIntervalType.DEFAULT_DAY_PRECISION, p2));
+			addResolutionMapping(
+				IntervalUnit.SECOND, null,
+				(p1, p2) -> new DayTimeIntervalType(DayTimeResolution.SECOND, DayTimeIntervalType.DEFAULT_DAY_PRECISION, p1));
+		}
+
+		private static void addResolutionMapping(
+				IntervalUnit leftUnit,
+				IntervalUnit rightUnit,
+				BiFunction<Integer, Integer, LogicalType> typeProvider) {
+			resolutionMapping.put(Arrays.asList(leftUnit, rightUnit), typeProvider);
+		}
+
+		private static LogicalType resolveInterval(Resolution fromResolution, @Nullable Resolution toResolution) {
+			final IntervalUnit toBoundary = toResolution == null ? null : toResolution.unit;
+			final int toPrecision = toResolution == null ? EMPTY_PRECISION : toResolution.precision;
+
+			final BiFunction<Integer, Integer, LogicalType> typeProvider = resolutionMapping.get(
+				Arrays.asList(fromResolution.unit, toBoundary));
+			if (typeProvider == null) {
+				throw new ValidationException(String.format("Unsupported interval definition '%s TO %s'. " +
+					"Please check the documentation for supported combinations for year-month and day-time intervals.",
+					fromResolution.unit,
+					toBoundary));
+			}
+			return typeProvider.apply(
+				fromResolution.precision,
+				toPrecision);
+		}
+
+		private final int precision;
+
+		private final IntervalUnit unit;
+
+		private Resolution(IntervalUnit unit, int precision) {
+			this.unit = unit;
+			this.precision = precision;
+		}
+
+		private Resolution(IntervalUnit unit) {
+			this(unit, EMPTY_PRECISION);
+		}
+
+		@Override
+		public String toString() {
+			if (precision != EMPTY_PRECISION) {
+				return String.format("%s(%d)", unit, precision);
+			}
+			return unit.toString();
+		}
+	}
+
+	/**
+	 * Helper class for defining the field of a row or structured type.
+	 */
+	public static final class Field {
+
+		private final String name;
+
+		private final DataType dataType;
+
+		private final @Nullable String description;
+
+		private Field(String name, DataType dataType, String description) {
+			this.name = name;
+			this.dataType = dataType;
+			this.description = description;
+		}
+
+		public String getName() {
+			return name;
+		}
+
+		public DataType getDataType() {
+			return dataType;
+		}
+
+		public Optional<String> getDescription() {
+			return Optional.ofNullable(description);
+		}
+	}
+
+	private DataTypes() {
+		// no instances
+	}
+
+	// CHECKSTYLE.ON: MethodName
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/AtomicDataType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/AtomicDataType.java
new file mode 100644
index 0000000..aea5d6b
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/AtomicDataType.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+/**
+ * A data type that does not contain further data types (e.g. {@code INT} or {@code BOOLEAN}).
+ *
+ * @see DataTypes for a list of supported data types
+ */
+@PublicEvolving
+public final class AtomicDataType extends DataType {
+
+	public AtomicDataType(LogicalType logicalType, @Nullable Class<?> conversionClass) {
+		super(logicalType, conversionClass);
+	}
+
+	public AtomicDataType(LogicalType logicalType) {
+		super(logicalType, null);
+	}
+
+	@Override
+	public DataType notNull() {
+		return new AtomicDataType(
+			logicalType.copy(false),
+			conversionClass);
+	}
+
+	@Override
+	public DataType nullable() {
+		return new AtomicDataType(
+			logicalType.copy(true),
+			conversionClass);
+	}
+
+	@Override
+	public DataType bridgedTo(Class<?> newConversionClass) {
+		return new AtomicDataType(
+			logicalType,
+			Preconditions.checkNotNull(newConversionClass, "New conversion class must not be null."));
+	}
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java
new file mode 100644
index 0000000..2865296
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Array;
+import java.util.Objects;
+
+/**
+ * A data type that contains an element type (e.g. {@code ARRAY} or {@code MULTISET}).
+ *
+ * @see DataTypes for a list of supported data types
+ */
+@PublicEvolving
+public final class CollectionDataType extends DataType {
+
+	private final DataType elementDataType;
+
+	public CollectionDataType(
+			LogicalType logicalType,
+			@Nullable Class<?> conversionClass,
+			DataType elementDataType) {
+		super(logicalType, conversionClass);
+		this.elementDataType = Preconditions.checkNotNull(elementDataType, "Element data type must not be null.");
+	}
+
+	public CollectionDataType(
+			LogicalType logicalType,
+			DataType elementDataType) {
+		this(logicalType, null, elementDataType);
+	}
+
+	public DataType getElementDataType() {
+		return elementDataType;
+	}
+
+	@Override
+	public DataType notNull() {
+		return new CollectionDataType(
+			logicalType.copy(false),
+			conversionClass,
+			elementDataType);
+	}
+
+	@Override
+	public DataType nullable() {
+		return new CollectionDataType(
+			logicalType.copy(true),
+			conversionClass,
+			elementDataType);
+	}
+
+	@Override
+	public DataType bridgedTo(Class<?> newConversionClass) {
+		return new CollectionDataType(
+			logicalType,
+			Preconditions.checkNotNull(newConversionClass, "New conversion class must not be null."),
+			elementDataType);
+	}
+
+	@Override
+	public Class<?> getConversionClass() {
+		// arrays are a special case because their default conversion class depends on the
+		// conversion class of the element type
+		if (logicalType.getTypeRoot() == LogicalTypeRoot.ARRAY && conversionClass == null) {
+			return Array.newInstance(elementDataType.getConversionClass(), 0).getClass();
+		}
+		return super.getConversionClass();
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		if (!super.equals(o)) {
+			return false;
+		}
+		CollectionDataType that = (CollectionDataType) o;
+		return elementDataType.equals(that.elementDataType);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(super.hashCode(), elementDataType);
+	}
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java
new file mode 100644
index 0000000..1ba654e
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Describes the data type of a value in the table ecosystem. Instances of this class can be used to
+ * declare input and/or output types of operations.
+ *
+ * <p>The {@link DataType} class has two responsibilities: declaring a logical type and giving hints
+ * about the physical representation of data to the optimizer. While the logical type is mandatory,
+ * hints are optional but useful at the edges to other APIs.
+ *
+ * <p>The logical type is independent of any physical representation and is close to the "data type"
+ * terminology of the SQL standard. See {@link org.apache.flink.table.types.logical.LogicalType} and
+ * its subclasses for more information about available logical types and their properties.
+ *
+ * <p>Physical hints are required at the edges of the table ecosystem. Hints indicate the data format
+ * that an implementation expects. For example, a data source could express that it produces values for
+ * logical timestamps using a {@link java.sql.Timestamp} class instead of using {@link java.time.LocalDateTime}.
+ * With this information, the runtime is able to convert the produced class into its internal data
+ * format. In return, a data sink can declare the data format it consumes from the runtime.
+ *
+ * @see DataTypes for a list of supported data types and instances of this class.
+ */
+@PublicEvolving
+public abstract class DataType implements Serializable {
+
+	protected LogicalType logicalType;
+
+	protected @Nullable Class<?> conversionClass;
+
+	DataType(LogicalType logicalType, @Nullable Class<?> conversionClass) {
+		this.logicalType = Preconditions.checkNotNull(logicalType, "Logical type must not be null.");
+		this.conversionClass = performEarlyClassValidation(logicalType, conversionClass);
+	}
+
+	/**
+	 * Returns the corresponding logical type.
+	 *
+	 * @return a parameterized instance of {@link LogicalType}
+	 */
+	public LogicalType getLogicalType() {
+		return logicalType;
+	}
+
+	/**
+	 * Returns the corresponding conversion class for representing values. If no conversion class was
+	 * defined manually, the default conversion defined by the logical type is used.
+	 *
+	 * @see LogicalType#getDefaultConversion()
+	 *
+	 * @return the expected conversion class
+	 */
+	public Class<?> getConversionClass() {
+		if (conversionClass == null) {
+			return logicalType.getDefaultConversion();
+		}
+		return conversionClass;
+	}
+
+	/**
+	 * Adds a hint that null values are not expected in the data for this type.
+	 *
+	 * @return a new, reconfigured data type instance
+	 */
+	public abstract DataType notNull();
+
+	/**
+	 * Adds a hint that null values are expected in the data for this type (default behavior).
+	 *
+	 * <p>This method exists for explicit declaration of the default behavior or for invalidation of
+	 * a previous call to {@link #notNull()}.
+	 *
+	 * @return a new, reconfigured data type instance
+	 */
+	public abstract DataType nullable();
+
+	/**
+	 * Adds a hint that data should be represented using the given class when entering or leaving
+	 * the table ecosystem.
+	 *
+	 * <p>A supported conversion class depends on the logical type and its nullability property.
+	 *
+	 * <p>Please see the implementation of {@link LogicalType#supportsInputConversion(Class)},
+	 * {@link LogicalType#supportsOutputConversion(Class)}, or the documentation for more information
+	 * about supported conversions.
+	 *
+	 * @return a new, reconfigured data type instance
+	 */
+	public abstract DataType bridgedTo(Class<?> newConversionClass);
+
+	@Override
+	public String toString() {
+		return logicalType.toString();
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		DataType dataType = (DataType) o;
+		return logicalType.equals(dataType.logicalType) &&
+			Objects.equals(conversionClass, dataType.conversionClass);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(logicalType, conversionClass);
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * This method should catch the most common errors. However, another validation is required in
+	 * deeper layers as we don't know whether the data type is used for input or output declaration.
+	 */
+	private static <C> Class<C> performEarlyClassValidation(
+			LogicalType logicalType,
+			Class<C> candidate) {
+
+		if (candidate != null &&
+				!logicalType.supportsInputConversion(candidate) &&
+				!logicalType.supportsOutputConversion(candidate)) {
+			throw new ValidationException(
+				String.format(
+					"Logical type '%s' does not support a conversion from or to class '%s'.",
+					logicalType.asSummaryString(),
+					candidate.getName()));
+		}
+		return candidate;
+	}
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/FieldsDataType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/FieldsDataType.java
new file mode 100644
index 0000000..c08fe58
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/FieldsDataType.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A data type that contains field data types (e.g. {@code ROW} or structured types).
+ *
+ * @see DataTypes for a list of supported data types
+ */
+@PublicEvolving
+public final class FieldsDataType extends DataType {
+
+	private final Map<String, DataType> fieldDataTypes;
+
+	public FieldsDataType(
+			LogicalType logicalType,
+			@Nullable Class<?> conversionClass,
+			Map<String, DataType> fieldDataTypes) {
+		super(logicalType, conversionClass);
+		this.fieldDataTypes = Collections.unmodifiableMap(
+			new HashMap<>(
+				Preconditions.checkNotNull(fieldDataTypes, "Field data types must not be null.")));
+	}
+
+	public FieldsDataType(
+			LogicalType logicalType,
+			Map<String, DataType> fieldDataTypes) {
+		this(logicalType, null, fieldDataTypes);
+	}
+
+	public Map<String, DataType> getFieldDataTypes() {
+		return fieldDataTypes;
+	}
+
+	@Override
+	public DataType notNull() {
+		return new FieldsDataType(
+			logicalType.copy(false),
+			conversionClass,
+			fieldDataTypes);
+	}
+
+	@Override
+	public DataType nullable() {
+		return new FieldsDataType(
+			logicalType.copy(true),
+			conversionClass,
+			fieldDataTypes);
+	}
+
+	@Override
+	public DataType bridgedTo(Class<?> newConversionClass) {
+		return new FieldsDataType(
+			logicalType,
+			Preconditions.checkNotNull(newConversionClass, "New conversion class must not be null."),
+			fieldDataTypes);
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		if (!super.equals(o)) {
+			return false;
+		}
+		FieldsDataType that = (FieldsDataType) o;
+		return fieldDataTypes.equals(that.fieldDataTypes);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(super.hashCode(), fieldDataTypes);
+	}
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/KeyValueDataType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/KeyValueDataType.java
new file mode 100644
index 0000000..0b345fd
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/KeyValueDataType.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
+/**
+ * A data type that contains a key and value data type (e.g. {@code MAP}).
+ *
+ * @see DataTypes for a list of supported data types
+ */
+@PublicEvolving
+public final class KeyValueDataType extends DataType {
+
+	private final DataType keyDataType;
+
+	private final DataType valueDataType;
+
+	public KeyValueDataType(
+			LogicalType logicalType,
+			@Nullable Class<?> conversionClass,
+			DataType keyDataType,
+			DataType valueDataType) {
+		super(logicalType, conversionClass);
+		this.keyDataType = Preconditions.checkNotNull(keyDataType, "Key data type must not be null.");
+		this.valueDataType = Preconditions.checkNotNull(valueDataType, "Value data type must not be null.");
+	}
+
+	public KeyValueDataType(
+			LogicalType logicalType,
+			DataType keyDataType,
+			DataType valueDataType) {
+		this(logicalType, null, keyDataType, valueDataType);
+	}
+
+	public DataType getKeyDataType() {
+		return keyDataType;
+	}
+
+	public DataType getValueDataType() {
+		return valueDataType;
+	}
+
+	@Override
+	public DataType notNull() {
+		return new KeyValueDataType(
+			logicalType.copy(false),
+			conversionClass,
+			keyDataType,
+			valueDataType);
+	}
+
+	@Override
+	public DataType nullable() {
+		return new KeyValueDataType(
+			logicalType.copy(true),
+			conversionClass,
+			keyDataType,
+			valueDataType);
+	}
+
+	@Override
+	public DataType bridgedTo(Class<?> newConversionClass) {
+		return new KeyValueDataType(
+			logicalType,
+			Preconditions.checkNotNull(newConversionClass, "New conversion class must not be null."),
+			keyDataType,
+			valueDataType);
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		if (!super.equals(o)) {
+			return false;
+		}
+		KeyValueDataType that = (KeyValueDataType) o;
+		return keyDataType.equals(that.keyDataType) && valueDataType.equals(that.valueDataType);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(super.hashCode(), keyDataType, valueDataType);
+	}
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java
new file mode 100644
index 0000000..2475db8
--- /dev/null
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.table.api.ValidationException;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.table.api.DataTypes.ARRAY;
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.CHAR;
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.INTERVAL;
+import static org.apache.flink.table.api.DataTypes.MONTH;
+import static org.apache.flink.table.api.DataTypes.MULTISET;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+import static org.apache.flink.table.api.DataTypes.YEAR;
+import static org.apache.flink.table.types.TypeTestingUtils.hasConversionClass;
+import static org.apache.flink.table.types.TypeTestingUtils.hasNullability;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Test for {@link DataType}.
+ */
+public class DataTypeTest {
+
+	@Test
+	public void testNullability() {
+		assertThat(
+			BIGINT().nullable(),
+			hasNullability(true));
+
+		assertThat(
+			BIGINT().notNull(),
+			hasNullability(false));
+
+		assertThat(
+			BIGINT().notNull().nullable(),
+			hasNullability(true));
+	}
+
+	@Test
+	public void testAtomicConversion() {
+		assertThat(
+			TIMESTAMP(0).bridgedTo(java.sql.Timestamp.class),
+			hasConversionClass(java.sql.Timestamp.class));
+	}
+
+	@Test
+	public void testTolerantAtomicConversion() {
+		// this is logically only supported as input type because of
+		// nullability but is tolerated until the planner complains
+		// about an output type
+		assertThat(
+			BIGINT().nullable().bridgedTo(long.class),
+			hasConversionClass(long.class));
+	}
+
+	@Test(expected = ValidationException.class)
+	public void testInvalidAtomicConversion() {
+		TIMESTAMP(0).bridgedTo(DataTypesTest.class);
+	}
+
+	@Test
+	public void testArrayElementConversion() {
+		assertThat(
+			ARRAY(ARRAY(INT().notNull().bridgedTo(int.class))),
+			hasConversionClass(int[][].class));
+	}
+
+	@Test
+	public void testTolerantArrayConversion() {
+		// this is logically only supported as input type because of
+		// nullability but is tolerated until the planner complains
+		// about an output type
+		assertThat(
+			ARRAY(ARRAY(INT().nullable())).bridgedTo(int[][].class),
+			hasConversionClass(int[][].class));
+	}
+
+	@Test(expected = ValidationException.class)
+	public void testInvalidArrayConversion() {
+		ARRAY(ARRAY(INT())).bridgedTo(int[][][].class);
+	}
+
+	@Test
+	public void testTolerantMapConversion() {
+		// this doesn't make much sense logically but is supported until the planner complains
+		assertThat(
+			MULTISET(MULTISET(INT().bridgedTo(int.class))),
+			hasConversionClass(Map.class));
+	}
+
+	@Test
+	public void testFields() {
+		final DataType rowDataType = ROW(FIELD("field1", CHAR(2)), FIELD("field2", BOOLEAN()));
+
+		final Map<String, DataType> fields = new HashMap<>();
+		fields.put("field1", CHAR(2));
+		fields.put("field2", BOOLEAN());
+		assertEquals(fields, ((FieldsDataType) rowDataType).getFieldDataTypes());
+	}
+
+	@Test(expected = ValidationException.class)
+	public void testInvalidOrderInterval() {
+		INTERVAL(MONTH(), YEAR(2));
+	}
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java
new file mode 100644
index 0000000..a1f4289
--- /dev/null
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.base.VoidSerializer;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.logical.AnyType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.TypeInformationAnyType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType.YearMonthResolution;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.api.DataTypes.ANY;
+import static org.apache.flink.table.api.DataTypes.ARRAY;
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.BINARY;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.BYTES;
+import static org.apache.flink.table.api.DataTypes.CHAR;
+import static org.apache.flink.table.api.DataTypes.DATE;
+import static org.apache.flink.table.api.DataTypes.DECIMAL;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.FLOAT;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.INTERVAL;
+import static org.apache.flink.table.api.DataTypes.MAP;
+import static org.apache.flink.table.api.DataTypes.MINUTE;
+import static org.apache.flink.table.api.DataTypes.MONTH;
+import static org.apache.flink.table.api.DataTypes.MULTISET;
+import static org.apache.flink.table.api.DataTypes.NULL;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.SECOND;
+import static org.apache.flink.table.api.DataTypes.SMALLINT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIME;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_TIME_ZONE;
+import static org.apache.flink.table.api.DataTypes.TINYINT;
+import static org.apache.flink.table.api.DataTypes.VARBINARY;
+import static org.apache.flink.table.api.DataTypes.VARCHAR;
+import static org.apache.flink.table.types.TypeTestingUtils.hasConversionClass;
+import static org.apache.flink.table.types.TypeTestingUtils.hasLogicalType;
+import static org.apache.flink.table.types.logical.DayTimeIntervalType.DEFAULT_DAY_PRECISION;
+import static org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution.MINUTE_TO_SECOND;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link DataTypes}.
+ */
+@RunWith(Parameterized.class)
+public class DataTypesTest {
+
+	@Parameters(name = "{index}: {0}=[Logical: {1}, Class: {2}]")
+	public static List<Object[]> dataTypes() {
+		return Arrays.asList(
+			new Object[][]{
+				{CHAR(2), new CharType(2), String.class},
+
+				{VARCHAR(2), new VarCharType(2), String.class},
+
+				{STRING(), new VarCharType(VarCharType.MAX_LENGTH), String.class},
+
+				{BOOLEAN(), new BooleanType(), Boolean.class},
+
+				{BINARY(42), new BinaryType(42), byte[].class},
+
+				{VARBINARY(42), new VarBinaryType(42), byte[].class},
+
+				{BYTES(), new VarBinaryType(VarBinaryType.MAX_LENGTH), byte[].class},
+
+				{DECIMAL(10, 10), new DecimalType(10, 10), BigDecimal.class},
+
+				{TINYINT(), new TinyIntType(), Byte.class},
+
+				{SMALLINT(), new SmallIntType(), Short.class},
+
+				{INT(), new IntType(), Integer.class},
+
+				{BIGINT(), new BigIntType(), Long.class},
+
+				{FLOAT(), new FloatType(), Float.class},
+
+				{DOUBLE(), new DoubleType(), Double.class},
+
+				{DATE(), new DateType(), java.time.LocalDate.class},
+
+				{TIME(3), new TimeType(3), java.time.LocalTime.class},
+
+				{TIMESTAMP(3), new TimestampType(3), java.time.LocalDateTime.class},
+
+				{TIMESTAMP_WITH_TIME_ZONE(3),
+					new ZonedTimestampType(3),
+					java.time.OffsetDateTime.class},
+
+				{TIMESTAMP_WITH_LOCAL_TIME_ZONE(3),
+					new LocalZonedTimestampType(3),
+					java.time.Instant.class},
+
+				{INTERVAL(MINUTE(), SECOND(3)),
+					new DayTimeIntervalType(MINUTE_TO_SECOND, DEFAULT_DAY_PRECISION, 3),
+					java.time.Duration.class},
+
+				{INTERVAL(MONTH()),
+					new YearMonthIntervalType(YearMonthResolution.MONTH),
+					java.time.Period.class},
+
+				{ARRAY(ARRAY(INT())),
+					new ArrayType(new ArrayType(new IntType())),
+					Integer[][].class},
+
+				{MULTISET(MULTISET(INT())),
+					new MultisetType(new MultisetType(new IntType())),
+					Map.class},
+
+				{MAP(INT(), SMALLINT()),
+					new MapType(new IntType(), new SmallIntType()),
+					Map.class},
+
+				{ROW(FIELD("field1", CHAR(2)), FIELD("field2", BOOLEAN())),
+					new RowType(
+						Arrays.asList(
+							new RowType.RowField("field1", new CharType(2)),
+							new RowType.RowField("field2", new BooleanType()))),
+					Row.class},
+
+				{NULL(), new NullType(), Object.class},
+
+				{ANY(Types.GENERIC(DataTypesTest.class)),
+					new TypeInformationAnyType<>(Types.GENERIC(DataTypesTest.class)),
+					DataTypesTest.class},
+
+				{ANY(Void.class, VoidSerializer.INSTANCE),
+					new AnyType<>(Void.class, VoidSerializer.INSTANCE),
+					Void.class}
+			}
+		);
+	}
+
+	@Parameter
+	public DataType dataType;
+
+	@Parameter(1)
+	public LogicalType expectedLogicalType;
+
+	@Parameter(2)
+	public Class<?> expectedConversionClass;
+
+	@Test
+	public void testLogicalType() {
+		assertThat(dataType, hasLogicalType(expectedLogicalType));
+	}
+
+	@Test
+	public void testConversionClass() {
+		assertThat(dataType, hasConversionClass(expectedConversionClass));
+	}
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/TypeTestingUtils.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/TypeTestingUtils.java
new file mode 100644
index 0000000..16fb4d0
--- /dev/null
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/TypeTestingUtils.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.table.types.logical.LogicalType;
+
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.FeatureMatcher;
+import org.hamcrest.Matcher;
+
+/**
+ * Utilities for testing types.
+ */
+public class TypeTestingUtils {
+
+	public static Matcher<DataType> hasLogicalType(LogicalType logicalType) {
+		return new FeatureMatcher<DataType, LogicalType>(
+				CoreMatchers.equalTo(logicalType),
+				"logical type of the data type",
+				"logical type") {
+
+			@Override
+			protected LogicalType featureValueOf(DataType actual) {
+				return actual.getLogicalType();
+			}
+		};
+	}
+
+	public static Matcher<DataType> hasConversionClass(Class<?> clazz) {
+		return new FeatureMatcher<DataType, Class<?>>(
+				CoreMatchers.equalTo(clazz),
+				"conversion class of the data type",
+				"conversion class") {
+
+			@Override
+			protected Class<?> featureValueOf(DataType actual) {
+				return actual.getConversionClass();
+			}
+		};
+	}
+
+	public static Matcher<DataType> hasNullability(boolean isNullable) {
+		return new FeatureMatcher<DataType, Boolean>(
+				CoreMatchers.equalTo(isNullable),
+				"nullability of the data type",
+				"nullability") {
+
+			@Override
+			protected Boolean featureValueOf(DataType actual) {
+				return actual.getLogicalType().isNullable();
+			}
+		};
+	}
+}