You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/05/09 12:36:24 UTC
[flink] 02/06: [FLINK-12393][table-common] Add the user-facing
classes of the new type system
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1cb9d680cbbc46e67de10acc99c5fb510b9382e5
Author: Timo Walther <tw...@apache.org>
AuthorDate: Fri May 3 13:52:23 2019 +0200
[FLINK-12393][table-common] Add the user-facing classes of the new type system
Introduces the DataType class and subclasses. Users are able to do a star import
of DataTypes and declare types like: `MULTISET(MULTISET(INT())`. Close to SQL. As
mentioned in FLIP-37, data types allow to specify format hints to the planner
using `TIMESTAMP(9).bridgedTo(java.sql.Timestamp)`.
This closes #8360.
---
.../java/org/apache/flink/table/api/DataTypes.java | 740 +++++++++++++++++++++
.../apache/flink/table/types/AtomicDataType.java | 64 ++
.../flink/table/types/CollectionDataType.java | 113 ++++
.../org/apache/flink/table/types/DataType.java | 163 +++++
.../apache/flink/table/types/FieldsDataType.java | 106 +++
.../apache/flink/table/types/KeyValueDataType.java | 113 ++++
.../org/apache/flink/table/types/DataTypeTest.java | 131 ++++
.../apache/flink/table/types/DataTypesTest.java | 211 ++++++
.../apache/flink/table/types/TypeTestingUtils.java | 70 ++
9 files changed, 1711 insertions(+)
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java
new file mode 100644
index 0000000..2591227
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java
@@ -0,0 +1,740 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.FieldsDataType;
+import org.apache.flink.table.types.KeyValueDataType;
+import org.apache.flink.table.types.logical.AnyType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.TypeInformationAnyType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType.YearMonthResolution;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A {@link DataType} can be used to declare input and/or output types of operations. This class
+ * enumerates all supported data types of the Table & SQL API.
+ */
+@PublicEvolving
+public final class DataTypes {
+
+ // we use SQL-like naming for data types and avoid Java keyword clashes
+ // CHECKSTYLE.OFF: MethodName
+
+ /**
+ * Data type of a fixed-length character string {@code CHAR(n)} where {@code n} is the number
+ * of code points. {@code n} must have a value between 1 and 255 (both inclusive).
+ *
+ * @see CharType
+ */
+ public static DataType CHAR(int n) {
+ return new AtomicDataType(new CharType(n));
+ }
+
+ /**
+ * Data type of a variable-length character string {@code VARCHAR(n)} where {@code n} is the
+ * maximum number of code points. {@code n} must have a value between 1 and {@link Integer#MAX_VALUE}
+ * (both inclusive).
+ *
+ * @see VarCharType
+ */
+ public static DataType VARCHAR(int n) {
+ return new AtomicDataType(new VarCharType(n));
+ }
+
+ /**
+ * Data type of a variable-length character string with defined maximum length. This is a shortcut
+ * for {@code VARCHAR(2147483647)} for representing JVM strings.
+ *
+ * @see VarCharType
+ */
+ public static DataType STRING() {
+ return VARCHAR(Integer.MAX_VALUE);
+ }
+
+ /**
+ * Data type of a boolean with a (possibly) three-valued logic of {@code TRUE, FALSE, UNKNOWN}.
+ *
+ * @see BooleanType
+ */
+ public static DataType BOOLEAN() {
+ return new AtomicDataType(new BooleanType());
+ }
+
+ /**
+ * Data type of a fixed-length binary string (=a sequence of bytes) {@code BINARY(n)} where
+ * {@code n} is the number of bytes. {@code n} must have a value between 1 and {@link Integer#MAX_VALUE}
+ * (both inclusive).
+ *
+ * @see BinaryType
+ */
+ public static DataType BINARY(int n) {
+ return new AtomicDataType(new BinaryType(n));
+ }
+
+ /**
+ * Data type of a variable-length binary string (=a sequence of bytes) {@code VARBINARY(n)} where
+ * {@code n} is the maximum number of bytes. {@code n} must have a value between 1 and {@link Integer#MAX_VALUE}
+ * (both inclusive).
+ *
+ * @see VarBinaryType
+ */
+ public static DataType VARBINARY(int n) {
+ return new AtomicDataType(new VarBinaryType(n));
+ }
+
+ /**
+ * Data type of a variable-length binary string (=a sequence of bytes) with defined maximum length.
+ * This is a shortcut for {@code VARBINARY(2147483647)} for representing JVM byte arrays.
+ *
+ * @see VarBinaryType
+ */
+ public static DataType BYTES() {
+ return VARBINARY(Integer.MAX_VALUE);
+ }
+
+ /**
+ * Data type of a decimal number with fixed precision and scale {@code DECIMAL(p, s)} where {@code p}
+ * is the number of digits in a number (=precision) and {@code s} is the number of digits to the
+ * right of the decimal point in a number (=scale). {@code p} must have a value between 1 and 38
+ * (both inclusive). {@code s} must have a value between 0 and {@code p} (both inclusive).
+ *
+ * @see DecimalType
+ */
+ public static DataType DECIMAL(int precision, int scale) {
+ return new AtomicDataType(new DecimalType(precision, scale));
+ }
+
+ /**
+ * Data type of a 1-byte signed integer with values from -128 to 127.
+ *
+ * @see TinyIntType
+ */
+ public static DataType TINYINT() {
+ return new AtomicDataType(new TinyIntType());
+ }
+
+ /**
+ * Data type of a 2-byte signed integer with values from -32,768 to 32,767.
+ *
+ * @see SmallIntType
+ */
+ public static DataType SMALLINT() {
+ return new AtomicDataType(new SmallIntType());
+ }
+
+ /**
+ * Data type of a 4-byte signed integer with values from -2,147,483,648 to 2,147,483,647.
+ *
+ * @see IntType
+ */
+ public static DataType INT() {
+ return new AtomicDataType(new IntType());
+ }
+
+ /**
+ * Data type of an 8-byte signed integer with values from -9,223,372,036,854,775,808 to
+ * 9,223,372,036,854,775,807.
+ *
+ * @see BigIntType
+ */
+ public static DataType BIGINT() {
+ return new AtomicDataType(new BigIntType());
+ }
+
+ /**
+ * Data type of a 4-byte single precision floating point number.
+ *
+ * @see FloatType
+ */
+ public static DataType FLOAT() {
+ return new AtomicDataType(new FloatType());
+ }
+
+ /**
+ * Data type of an 8-byte double precision floating point number.
+ *
+ * @see DoubleType
+ */
+ public static DataType DOUBLE() {
+ return new AtomicDataType(new DoubleType());
+ }
+
+ /**
+ * Data type of a date consisting of {@code year-month-day} with values ranging from {@code 0000-01-01}
+ * to {@code 9999-12-31}.
+ *
+ * <p>Compared to the SQL standard, the range starts at year {@code 0000}.
+ *
+ * @see DataType
+ */
+ public static DataType DATE() {
+ return new AtomicDataType(new DateType());
+ }
+
+ /**
+ * Data type of a time WITHOUT time zone {@code TIME(p)} where {@code p} is the number of digits
+ * of fractional seconds (=precision). {@code p} must have a value between 0 and 9 (both inclusive).
+ *
+ * <p>An instance consists of {@code hour:minute:second[.fractional]} with up to nanosecond precision
+ * and values ranging from {@code 00:00:00.000000000} to {@code 23:59:59.999999999}.
+ *
+ * <p>Compared to the SQL standard, leap seconds (23:59:60 and 23:59:61) are not supported as the
+ * semantics are closer to {@link java.time.LocalTime}. A time WITH time zone is not provided.
+ *
+ * @see TimeType
+ */
+ public static DataType TIME(int precision) {
+ return new AtomicDataType(new TimeType(precision));
+ }
+
+ /**
+ * Data type of a timestamp WITHOUT time zone {@code TIMESTAMP(p)} where {@code p} is the number
+ * of digits of fractional seconds (=precision). {@code p} must have a value between 0 and 9 (both
+ * inclusive).
+ *
+ * <p>An instance consists of {@code year-month-day hour:minute:second[.fractional]} with up to
+ * nanosecond precision and values ranging from {@code 0000-01-01 00:00:00.000000000} to
+ * {@code 9999-12-31 23:59:59.999999999}.
+ *
+ * <p>Compared to the SQL standard, leap seconds (23:59:60 and 23:59:61) are not supported as the
+ * semantics are closer to {@link java.time.LocalDateTime}.
+ *
+ * @see #TIMESTAMP_WITH_TIME_ZONE(int)
+ * @see #TIMESTAMP_WITH_LOCAL_TIME_ZONE(int)
+ * @see TimestampType
+ */
+ public static DataType TIMESTAMP(int precision) {
+ return new AtomicDataType(new TimestampType(precision));
+ }
+
+ /**
+ * Data type of a timestamp WITH time zone {@code TIMESTAMP(p) WITH TIME ZONE} where {@code p} is
+ * the number of digits of fractional seconds (=precision). {@code p} must have a value between 0
+ * and 9 (both inclusive).
+ *
+ * <p>An instance consists of {@code year-month-day hour:minute:second[.fractional] zone} with up
+ * to nanosecond precision and values ranging from {@code 0000-01-01 00:00:00.000000000 +14:59} to
+ * {@code 9999-12-31 23:59:59.999999999 -14:59}.
+ *
+ * <p>Compared to the SQL standard, leap seconds (23:59:60 and 23:59:61) are not supported as the
+ * semantics are closer to {@link java.time.OffsetDateTime}.
+ *
+ * @see #TIMESTAMP(int)
+ * @see #TIMESTAMP_WITH_LOCAL_TIME_ZONE(int)
+ * @see ZonedTimestampType
+ */
+ public static DataType TIMESTAMP_WITH_TIME_ZONE(int precision) {
+ return new AtomicDataType(new ZonedTimestampType(precision));
+ }
+
+ /**
+ * Data type of a timestamp WITH LOCAL time zone {@code TIMESTAMP(p) WITH LOCAL TIME ZONE} where
+ * {@code p} is the number of digits of fractional seconds (=precision). {@code p} must have a value
+ * between 0 and 9 (both inclusive).
+ *
+ * <p>An instance consists of {@code year-month-day hour:minute:second[.fractional] zone} with up
+ * to nanosecond precision and values ranging from {@code 0000-01-01 00:00:00.000000000 +14:59} to
+ * {@code 9999-12-31 23:59:59.999999999 -14:59}. Leap seconds (23:59:60 and 23:59:61) are not supported
+ * as the semantics are closer to {@link java.time.OffsetDateTime}.
+ *
+ * <p>Compared to {@link ZonedTimestampType}, the time zone offset information is not stored physically
+ * in every datum. Instead, the type assumes {@link java.time.Instant} semantics in UTC time zone
+ * at the edges of the table ecosystem. Every datum is interpreted in the local time zone configured
+ * in the current session for computation and visualization.
+ *
+ * <p>This type fills the gap between time zone free and time zone mandatory timestamp types by
+ * allowing the interpretation of UTC timestamps according to the configured session timezone.
+ *
+ * @see #TIMESTAMP(int)
+ * @see #TIMESTAMP_WITH_TIME_ZONE(int)
+ * @see LocalZonedTimestampType
+ */
+ public static DataType TIMESTAMP_WITH_LOCAL_TIME_ZONE(int precision) {
+ return new AtomicDataType(new LocalZonedTimestampType(precision));
+ }
+
+ /**
+ * Data type of a temporal interval. There are two types of temporal intervals: day-time intervals
+ * with up to nanosecond granularity or year-month intervals with up to month granularity.
+ *
+ * <p>An interval of day-time consists of {@code +days hours:months:seconds.fractional} with values
+ * ranging from {@code -999999 23:59:59.999999999} to {@code +999999 23:59:59.999999999}. The type
+ * must be parameterized to one of the following resolutions: interval of days, interval of days to
+ * hours, interval of days to minutes, interval of days to seconds, interval of hours, interval of
+ * hours to minutes, interval of hours to seconds, interval of minutes, interval of minutes to seconds,
+ * or interval of seconds. The value representation is the same for all types of resolutions. For
+ * example, an interval of seconds of 70 is always represented in an interval-of-days-to-seconds
+ * format (with default precisions): {@code +00 00:01:10.000000}).
+ *
+ * <p>An interval of year-month consists of {@code +years-months} with values ranging from {@code -9999-11}
+ * to {@code +9999-11}. The type must be parameterized to one of the following resolutions: interval
+ * of years, interval of years to months, or interval of months. The value representation is the
+ * same for all types of resolutions. For example, an interval of months of 50 is always represented
+ * in an interval-of-years-to-months format (with default year precision): {@code +04-02}.
+ *
+ * <p>Examples: {@code INTERVAL(DAY(2))} for a day-time interval or {@code INTERVAL(YEAR(4))} for
+ * a year-month interval.
+ *
+ * @see DayTimeIntervalType
+ * @see YearMonthIntervalType
+ */
+ public static DataType INTERVAL(Resolution resolution) {
+ Preconditions.checkNotNull(resolution, "Interval resolution must not be null.");
+ return new AtomicDataType(Resolution.resolveInterval(resolution, null));
+ }
+
+ /**
+ * Data type of a temporal interval. There are two types of temporal intervals: day-time intervals
+ * with up to nanosecond granularity or year-month intervals with up to month granularity.
+ *
+ * <p>An interval of day-time consists of {@code +days hours:months:seconds.fractional} with values
+ * ranging from {@code -999999 23:59:59.999999999} to {@code +999999 23:59:59.999999999}. The type
+ * must be parameterized to one of the following resolutions: interval of days, interval of days to
+ * hours, interval of days to minutes, interval of days to seconds, interval of hours, interval of
+ * hours to minutes, interval of hours to seconds, interval of minutes, interval of minutes to seconds,
+ * or interval of seconds. The value representation is the same for all types of resolutions. For
+ * example, an interval of seconds of 70 is always represented in an interval-of-days-to-seconds
+ * format (with default precisions): {@code +00 00:01:10.000000}).
+ *
+ * <p>An interval of year-month consists of {@code +years-months} with values ranging from {@code -9999-11}
+ * to {@code +9999-11}. The type must be parameterized to one of the following resolutions: interval
+ * of years, interval of years to months, or interval of months. The value representation is the
+ * same for all types of resolutions. For example, an interval of months of 50 is always represented
+ * in an interval-of-years-to-months format (with default year precision): {@code +04-02}.
+ *
+ * <p>Examples: {@code INTERVAL(DAY(2), SECOND(9))} for a day-time interval or {@code INTERVAL(YEAR(4), MONTH())}
+ * for a year-month interval.
+ *
+ * @see DayTimeIntervalType
+ * @see YearMonthIntervalType
+ */
+ public static DataType INTERVAL(Resolution upperResolution, Resolution lowerResolution) {
+ Preconditions.checkNotNull(upperResolution, "Upper interval resolution must not be null.");
+ Preconditions.checkNotNull(lowerResolution, "Lower interval resolution must not be null.");
+ return new AtomicDataType(Resolution.resolveInterval(upperResolution, lowerResolution));
+ }
+
+ /**
+ * Data type of an array of elements with same subtype.
+ *
+ * <p>Compared to the SQL standard, the maximum cardinality of an array cannot be specified but
+ * is fixed at {@link Integer#MAX_VALUE}. Also, any valid type is supported as a subtype.
+ *
+ * @see ArrayType
+ */
+ public static DataType ARRAY(DataType elementDataType) {
+ Preconditions.checkNotNull(elementDataType, "Element data type must not be null.");
+ return new CollectionDataType(new ArrayType(elementDataType.getLogicalType()), elementDataType);
+ }
+
+ /**
+ * Data type of a multiset (=bag). Unlike a set, it allows for multiple instances for each of its
+ * elements with a common subtype. Each unique value (including {@code NULL}) is mapped to some
+ * multiplicity.
+ *
+ * <p>There is no restriction of element types; it is the responsibility of the user to ensure
+ * uniqueness.
+ *
+ * @see MultisetType
+ */
+ public static DataType MULTISET(DataType elementDataType) {
+ Preconditions.checkNotNull(elementDataType, "Element data type must not be null.");
+ return new CollectionDataType(new MultisetType(elementDataType.getLogicalType()), elementDataType);
+ }
+
+ /**
+ * Data type of an associative array that maps keys (including {@code NULL}) to values (including
+ * {@code NULL}). A map cannot contain duplicate keys; each key can map to at most one value.
+ *
+ * <p>There is no restriction of key types; it is the responsibility of the user to ensure uniqueness.
+ * The map type is an extension to the SQL standard.
+ *
+ * @see MapType
+ */
+ public static DataType MAP(DataType keyDataType, DataType valueDataType) {
+ Preconditions.checkNotNull(keyDataType, "Key data type must not be null.");
+ Preconditions.checkNotNull(valueDataType, "Value data type must not be null.");
+ return new KeyValueDataType(
+ new MapType(keyDataType.getLogicalType(), valueDataType.getLogicalType()),
+ keyDataType,
+ valueDataType);
+ }
+
+ /**
+ * Data type of a sequence of fields. A field consists of a field name, field type, and an optional
+ * description. The most specific type of a row of a table is a row type. In this case, each column
+ * of the row corresponds to the field of the row type that has the same ordinal position as the
+ * column.
+ *
+ * <p>Compared to the SQL standard, an optional field description simplifies the handling with
+ * complex structures.
+ *
+ * @see RowType
+ */
+ public static DataType ROW(Field... fields) {
+ final List<RowType.RowField> logicalFields = Stream.of(fields)
+ .map(f -> Preconditions.checkNotNull(f, "Field definition must not be null."))
+ .map(f -> new RowType.RowField(f.name, f.dataType.getLogicalType(), f.description))
+ .collect(Collectors.toList());
+ final Map<String, DataType> fieldDataTypes = Stream.of(fields)
+ .collect(Collectors.toMap(f -> f.name, f -> f.dataType));
+ return new FieldsDataType(new RowType(logicalFields), fieldDataTypes);
+ }
+
+ /**
+ * Data type for representing untyped {@code NULL} values. A null type has no other value except
+ * {@code NULL}, thus, it can be cast to any nullable type similar to JVM semantics.
+ *
+ * <p>This type helps in representing unknown types in API calls that use a {@code NULL} literal
+ * as well as bridging to formats such as JSON or Avro that define such a type as well.
+ *
+ * <p>The null type is an extension to the SQL standard.
+ *
+ * @see NullType
+ */
+ public static DataType NULL() {
+ return new AtomicDataType(new NullType());
+ }
+
+ /**
+ * Data type of an arbitrary serialized type. This type is a black box within the table ecosystem
+ * and is only deserialized at the edges.
+ *
+ * <p>The any type is an extension to the SQL standard.
+ *
+ * <p>This method assumes that a {@link TypeSerializer} instance is present. Use {@link #ANY(TypeInformation)}
+ * for generating a serializer from Flink's core type system automatically in subsequent layers.
+ *
+ * @param clazz originating value class
+ * @param serializer type serializer
+ *
+ * @see AnyType
+ */
+ public static <T> DataType ANY(Class<T> clazz, TypeSerializer<T> serializer) {
+ return new AtomicDataType(new AnyType<>(clazz, serializer));
+ }
+
+ /**
+ * Data type of an arbitrary serialized type backed by {@link TypeInformation}. This type is
+ * a black box within the table ecosystem and is only deserialized at the edges.
+ *
+ * <p>The any type is an extension to the SQL standard.
+ *
+ * <p>Compared to an {@link #ANY(Class, TypeSerializer)}, this type does not contain a {@link TypeSerializer}
+ * yet. The serializer will be generated from the enclosed {@link TypeInformation} but needs access
+ * to the {@link ExecutionConfig} of the current execution environment. Thus, this type is just a
+ * placeholder.
+ *
+ * @see TypeInformationAnyType
+ */
+ public static <T> DataType ANY(TypeInformation<T> typeInformation) {
+ return new AtomicDataType(new TypeInformationAnyType<>(typeInformation));
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Helper functions
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Resolution in seconds with 6 digits for fractional seconds by default.
+ *
+ * @see #SECOND(int)
+ */
+ public static Resolution SECOND() {
+ return new Resolution(Resolution.IntervalUnit.SECOND, DayTimeIntervalType.DEFAULT_FRACTIONAL_PRECISION);
+ }
+
+ /**
+ * Resolution in seconds and (possibly) fractional seconds. The precision is the number of
+ * digits of fractional seconds. It must have a value between 0 and 9 (both inclusive). If
+ * no fractional is specified, it is equal to 6 by default.
+ *
+ * @see #SECOND()
+ */
+ public static Resolution SECOND(int precision) {
+ return new Resolution(Resolution.IntervalUnit.SECOND, precision);
+ }
+
+ /**
+ * Resolution in minutes.
+ */
+ public static Resolution MINUTE() {
+ return new Resolution(Resolution.IntervalUnit.MINUTE);
+ }
+
+ /**
+ * Resolution in hours.
+ */
+ public static Resolution HOUR() {
+ return new Resolution(Resolution.IntervalUnit.HOUR);
+ }
+
+ /**
+ * Resolution in days. The precision is the number of digits of days. It must have a value
+ * between 1 and 6 (both inclusive). If no precision is specified, it is equal to 2 by default.
+ *
+ * @see #DAY()
+ */
+ public static Resolution DAY(int precision) {
+ return new Resolution(Resolution.IntervalUnit.DAY, precision);
+ }
+
+ /**
+ * Resolution in days with 2 digits for the number of days by default.
+ *
+ * @see #DAY(int)
+ */
+ public static Resolution DAY() {
+ return new Resolution(Resolution.IntervalUnit.DAY, DayTimeIntervalType.DEFAULT_DAY_PRECISION);
+ }
+
+ /**
+ * Resolution in months.
+ */
+ public static Resolution MONTH() {
+ return new Resolution(Resolution.IntervalUnit.MONTH);
+ }
+
+ /**
+ * Resolution in years. The precision is the number of digits of years. It must have a value
+ * between 1 and 4 (both inclusive). If no precision is specified, it is equal to 2.
+ *
+ * @see #YEAR()
+ */
+ public static Resolution YEAR(int precision) {
+ return new Resolution(Resolution.IntervalUnit.YEAR, precision);
+ }
+
+ /**
+ * Resolution in years with 2 digits for the number of years by default.
+ *
+ * @see #YEAR(int)
+ */
+ public static Resolution YEAR() {
+ return new Resolution(Resolution.IntervalUnit.YEAR, YearMonthIntervalType.DEFAULT_PRECISION);
+ }
+
+ /**
+ * Field definition with field name and data type.
+ */
+ public static Field FIELD(String name, DataType dataType) {
+ return new Field(
+ Preconditions.checkNotNull(name, "Field name must not be null."),
+ Preconditions.checkNotNull(dataType, "Field data type must not be null."),
+ null);
+ }
+
+ /**
+ * Field definition with field name, data type, and a description.
+ */
+ public static Field FIELD(String name, DataType dataType, String description) {
+ return new Field(
+ Preconditions.checkNotNull(name, "Field name must not be null."),
+ Preconditions.checkNotNull(dataType, "Field data type must not be null."),
+ Preconditions.checkNotNull(description, "Field description must not be null."));
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Helper classes
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Helper class for defining the resolution of an interval.
+ */
+ public static final class Resolution {
+
+ private static final int EMPTY_PRECISION = -1;
+
+ private enum IntervalUnit {
+ SECOND,
+ MINUTE,
+ HOUR,
+ DAY,
+ MONTH,
+ YEAR
+ }
+
+ private static final Map<List<IntervalUnit>, BiFunction<Integer, Integer, LogicalType>> resolutionMapping = new HashMap<>();
+ static {
+ addResolutionMapping(
+ IntervalUnit.YEAR, null,
+ (p1, p2) -> new YearMonthIntervalType(YearMonthResolution.YEAR, p1));
+ addResolutionMapping(
+ IntervalUnit.MONTH, null,
+ (p1, p2) -> new YearMonthIntervalType(YearMonthResolution.MONTH));
+ addResolutionMapping(
+ IntervalUnit.YEAR, IntervalUnit.MONTH,
+ (p1, p2) -> new YearMonthIntervalType(YearMonthResolution.YEAR_TO_MONTH, p1));
+ addResolutionMapping(
+ IntervalUnit.DAY, null,
+ (p1, p2) -> new DayTimeIntervalType(DayTimeResolution.DAY, p1, DayTimeIntervalType.DEFAULT_FRACTIONAL_PRECISION));
+ addResolutionMapping(
+ IntervalUnit.DAY, IntervalUnit.HOUR,
+ (p1, p2) -> new DayTimeIntervalType(DayTimeResolution.DAY_TO_HOUR, p1, DayTimeIntervalType.DEFAULT_FRACTIONAL_PRECISION));
+ addResolutionMapping(
+ IntervalUnit.DAY, IntervalUnit.MINUTE,
+ (p1, p2) -> new DayTimeIntervalType(DayTimeResolution.DAY_TO_MINUTE, p1, DayTimeIntervalType.DEFAULT_FRACTIONAL_PRECISION));
+ addResolutionMapping(
+ IntervalUnit.DAY, IntervalUnit.SECOND,
+ (p1, p2) -> new DayTimeIntervalType(DayTimeResolution.DAY_TO_SECOND, p1, p2));
+ addResolutionMapping(
+ IntervalUnit.HOUR, null,
+ (p1, p2) -> new DayTimeIntervalType(DayTimeResolution.HOUR));
+ addResolutionMapping(
+ IntervalUnit.HOUR, IntervalUnit.MINUTE,
+ (p1, p2) -> new DayTimeIntervalType(DayTimeResolution.HOUR_TO_MINUTE));
+ addResolutionMapping(
+ IntervalUnit.HOUR, IntervalUnit.SECOND,
+ (p1, p2) -> new DayTimeIntervalType(DayTimeResolution.HOUR_TO_SECOND, DayTimeIntervalType.DEFAULT_DAY_PRECISION, p2));
+ addResolutionMapping(
+ IntervalUnit.MINUTE, null,
+ (p1, p2) -> new DayTimeIntervalType(DayTimeResolution.MINUTE));
+ addResolutionMapping(
+ IntervalUnit.MINUTE, IntervalUnit.SECOND,
+ (p1, p2) -> new DayTimeIntervalType(DayTimeResolution.MINUTE_TO_SECOND, DayTimeIntervalType.DEFAULT_DAY_PRECISION, p2));
+ addResolutionMapping(
+ IntervalUnit.SECOND, null,
+ (p1, p2) -> new DayTimeIntervalType(DayTimeResolution.SECOND, DayTimeIntervalType.DEFAULT_DAY_PRECISION, p1));
+ }
+
+ private static void addResolutionMapping(
+ IntervalUnit leftUnit,
+ IntervalUnit rightUnit,
+ BiFunction<Integer, Integer, LogicalType> typeProvider) {
+ resolutionMapping.put(Arrays.asList(leftUnit, rightUnit), typeProvider);
+ }
+
+ private static LogicalType resolveInterval(Resolution fromResolution, @Nullable Resolution toResolution) {
+ final IntervalUnit toBoundary = toResolution == null ? null : toResolution.unit;
+ final int toPrecision = toResolution == null ? EMPTY_PRECISION : toResolution.precision;
+
+ final BiFunction<Integer, Integer, LogicalType> typeProvider = resolutionMapping.get(
+ Arrays.asList(fromResolution.unit, toBoundary));
+ if (typeProvider == null) {
+ throw new ValidationException(String.format("Unsupported interval definition '%s TO %s'. " +
+ "Please check the documentation for supported combinations for year-month and day-time intervals.",
+ fromResolution.unit,
+ toBoundary));
+ }
+ return typeProvider.apply(
+ fromResolution.precision,
+ toPrecision);
+ }
+
+ private final int precision;
+
+ private final IntervalUnit unit;
+
+ private Resolution(IntervalUnit unit, int precision) {
+ this.unit = unit;
+ this.precision = precision;
+ }
+
+ private Resolution(IntervalUnit unit) {
+ this(unit, EMPTY_PRECISION);
+ }
+
+ @Override
+ public String toString() {
+ if (precision != EMPTY_PRECISION) {
+ return String.format("%s(%d)", unit, precision);
+ }
+ return unit.toString();
+ }
+ }
+
+ /**
+ * Helper class for defining the field of a row or structured type.
+ */
+ public static final class Field {
+
+ private final String name;
+
+ private final DataType dataType;
+
+ private final @Nullable String description;
+
+ private Field(String name, DataType dataType, String description) {
+ this.name = name;
+ this.dataType = dataType;
+ this.description = description;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public DataType getDataType() {
+ return dataType;
+ }
+
+ public Optional<String> getDescription() {
+ return Optional.ofNullable(description);
+ }
+ }
+
+ private DataTypes() {
+ // no instances
+ }
+
+ // CHECKSTYLE.ON: MethodName
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/AtomicDataType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/AtomicDataType.java
new file mode 100644
index 0000000..aea5d6b
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/AtomicDataType.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+/**
+ * A data type that does not contain further data types (e.g. {@code INT} or {@code BOOLEAN}).
+ *
+ * @see DataTypes for a list of supported data types
+ */
+@PublicEvolving
+public final class AtomicDataType extends DataType {
+
+ public AtomicDataType(LogicalType logicalType, @Nullable Class<?> conversionClass) {
+ super(logicalType, conversionClass);
+ }
+
+ public AtomicDataType(LogicalType logicalType) {
+ super(logicalType, null);
+ }
+
+ @Override
+ public DataType notNull() {
+ return new AtomicDataType(
+ logicalType.copy(false),
+ conversionClass);
+ }
+
+ @Override
+ public DataType nullable() {
+ return new AtomicDataType(
+ logicalType.copy(true),
+ conversionClass);
+ }
+
+ @Override
+ public DataType bridgedTo(Class<?> newConversionClass) {
+ return new AtomicDataType(
+ logicalType,
+ Preconditions.checkNotNull(newConversionClass, "New conversion class must not be null."));
+ }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java
new file mode 100644
index 0000000..2865296
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Array;
+import java.util.Objects;
+
+/**
+ * A data type that contains an element type (e.g. {@code ARRAY} or {@code MULTISET}).
+ *
+ * @see DataTypes for a list of supported data types
+ */
+@PublicEvolving
+public final class CollectionDataType extends DataType {
+
+ private final DataType elementDataType;
+
+ public CollectionDataType(
+ LogicalType logicalType,
+ @Nullable Class<?> conversionClass,
+ DataType elementDataType) {
+ super(logicalType, conversionClass);
+ this.elementDataType = Preconditions.checkNotNull(elementDataType, "Element data type must not be null.");
+ }
+
+ public CollectionDataType(
+ LogicalType logicalType,
+ DataType elementDataType) {
+ this(logicalType, null, elementDataType);
+ }
+
+ public DataType getElementDataType() {
+ return elementDataType;
+ }
+
+ @Override
+ public DataType notNull() {
+ return new CollectionDataType(
+ logicalType.copy(false),
+ conversionClass,
+ elementDataType);
+ }
+
+ @Override
+ public DataType nullable() {
+ return new CollectionDataType(
+ logicalType.copy(true),
+ conversionClass,
+ elementDataType);
+ }
+
+ @Override
+ public DataType bridgedTo(Class<?> newConversionClass) {
+ return new CollectionDataType(
+ logicalType,
+ Preconditions.checkNotNull(newConversionClass, "New conversion class must not be null."),
+ elementDataType);
+ }
+
+ @Override
+ public Class<?> getConversionClass() {
+ // arrays are a special case because their default conversion class depends on the
+ // conversion class of the element type
+ if (logicalType.getTypeRoot() == LogicalTypeRoot.ARRAY && conversionClass == null) {
+ return Array.newInstance(elementDataType.getConversionClass(), 0).getClass();
+ }
+ return super.getConversionClass();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ CollectionDataType that = (CollectionDataType) o;
+ return elementDataType.equals(that.elementDataType);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), elementDataType);
+ }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java
new file mode 100644
index 0000000..1ba654e
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Describes the data type of a value in the table ecosystem. Instances of this class can be used to
+ * declare input and/or output types of operations.
+ *
+ * <p>The {@link DataType} class has two responsibilities: declaring a logical type and giving hints
+ * about the physical representation of data to the optimizer. While the logical type is mandatory,
+ * hints are optional but useful at the edges to other APIs.
+ *
+ * <p>The logical type is independent of any physical representation and is close to the "data type"
+ * terminology of the SQL standard. See {@link org.apache.flink.table.types.logical.LogicalType} and
+ * its subclasses for more information about available logical types and their properties.
+ *
+ * <p>Physical hints are required at the edges of the table ecosystem. Hints indicate the data format
+ * that an implementation expects. For example, a data source could express that it produces values for
+ * logical timestamps using a {@link java.sql.Timestamp} class instead of using {@link java.time.LocalDateTime}.
+ * With this information, the runtime is able to convert the produced class into its internal data
+ * format. In return, a data sink can declare the data format it consumes from the runtime.
+ *
+ * @see DataTypes for a list of supported data types and instances of this class.
+ */
+@PublicEvolving
+public abstract class DataType implements Serializable {
+
+ protected LogicalType logicalType;
+
+ protected @Nullable Class<?> conversionClass;
+
+ DataType(LogicalType logicalType, @Nullable Class<?> conversionClass) {
+ this.logicalType = Preconditions.checkNotNull(logicalType, "Logical type must not be null.");
+ this.conversionClass = performEarlyClassValidation(logicalType, conversionClass);
+ }
+
+ /**
+ * Returns the corresponding logical type.
+ *
+ * @return a parameterized instance of {@link LogicalType}
+ */
+ public LogicalType getLogicalType() {
+ return logicalType;
+ }
+
+ /**
+ * Returns the corresponding conversion class for representing values. If no conversion class was
+ * defined manually, the default conversion defined by the logical type is used.
+ *
+ * @see LogicalType#getDefaultConversion()
+ *
+ * @return the expected conversion class
+ */
+ public Class<?> getConversionClass() {
+ if (conversionClass == null) {
+ return logicalType.getDefaultConversion();
+ }
+ return conversionClass;
+ }
+
+ /**
+ * Adds a hint that null values are not expected in the data for this type.
+ *
+ * @return a new, reconfigured data type instance
+ */
+ public abstract DataType notNull();
+
+ /**
+ * Adds a hint that null values are expected in the data for this type (default behavior).
+ *
+ * <p>This method exists for explicit declaration of the default behavior or for invalidation of
+ * a previous call to {@link #notNull()}.
+ *
+ * @return a new, reconfigured data type instance
+ */
+ public abstract DataType nullable();
+
+ /**
+ * Adds a hint that data should be represented using the given class when entering or leaving
+ * the table ecosystem.
+ *
+ * <p>A supported conversion class depends on the logical type and its nullability property.
+ *
+ * <p>Please see the implementation of {@link LogicalType#supportsInputConversion(Class)},
+ * {@link LogicalType#supportsOutputConversion(Class)}, or the documentation for more information
+ * about supported conversions.
+ *
+ * @return a new, reconfigured data type instance
+ */
+ public abstract DataType bridgedTo(Class<?> newConversionClass);
+
+ @Override
+ public String toString() {
+ return logicalType.toString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DataType dataType = (DataType) o;
+ return logicalType.equals(dataType.logicalType) &&
+ Objects.equals(conversionClass, dataType.conversionClass);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(logicalType, conversionClass);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * This method should catch the most common errors. However, another validation is required in
+ * deeper layers as we don't know whether the data type is used for input or output declaration.
+ */
+ private static <C> Class<C> performEarlyClassValidation(
+ LogicalType logicalType,
+ Class<C> candidate) {
+
+ if (candidate != null &&
+ !logicalType.supportsInputConversion(candidate) &&
+ !logicalType.supportsOutputConversion(candidate)) {
+ throw new ValidationException(
+ String.format(
+ "Logical type '%s' does not support a conversion from or to class '%s'.",
+ logicalType.asSummaryString(),
+ candidate.getName()));
+ }
+ return candidate;
+ }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/FieldsDataType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/FieldsDataType.java
new file mode 100644
index 0000000..c08fe58
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/FieldsDataType.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A data type that contains field data types (e.g. {@code ROW} or structured types).
+ *
+ * @see DataTypes for a list of supported data types
+ */
+@PublicEvolving
+public final class FieldsDataType extends DataType {
+
+ private final Map<String, DataType> fieldDataTypes;
+
+ public FieldsDataType(
+ LogicalType logicalType,
+ @Nullable Class<?> conversionClass,
+ Map<String, DataType> fieldDataTypes) {
+ super(logicalType, conversionClass);
+ this.fieldDataTypes = Collections.unmodifiableMap(
+ new HashMap<>(
+ Preconditions.checkNotNull(fieldDataTypes, "Field data types must not be null.")));
+ }
+
+ public FieldsDataType(
+ LogicalType logicalType,
+ Map<String, DataType> fieldDataTypes) {
+ this(logicalType, null, fieldDataTypes);
+ }
+
+ public Map<String, DataType> getFieldDataTypes() {
+ return fieldDataTypes;
+ }
+
+ @Override
+ public DataType notNull() {
+ return new FieldsDataType(
+ logicalType.copy(false),
+ conversionClass,
+ fieldDataTypes);
+ }
+
+ @Override
+ public DataType nullable() {
+ return new FieldsDataType(
+ logicalType.copy(true),
+ conversionClass,
+ fieldDataTypes);
+ }
+
+ @Override
+ public DataType bridgedTo(Class<?> newConversionClass) {
+ return new FieldsDataType(
+ logicalType,
+ Preconditions.checkNotNull(newConversionClass, "New conversion class must not be null."),
+ fieldDataTypes);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ FieldsDataType that = (FieldsDataType) o;
+ return fieldDataTypes.equals(that.fieldDataTypes);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), fieldDataTypes);
+ }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/KeyValueDataType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/KeyValueDataType.java
new file mode 100644
index 0000000..0b345fd
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/KeyValueDataType.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
+/**
+ * A data type that contains a key and value data type (e.g. {@code MAP}).
+ *
+ * @see DataTypes for a list of supported data types
+ */
+@PublicEvolving
+public final class KeyValueDataType extends DataType {
+
+ private final DataType keyDataType;
+
+ private final DataType valueDataType;
+
+ public KeyValueDataType(
+ LogicalType logicalType,
+ @Nullable Class<?> conversionClass,
+ DataType keyDataType,
+ DataType valueDataType) {
+ super(logicalType, conversionClass);
+ this.keyDataType = Preconditions.checkNotNull(keyDataType, "Key data type must not be null.");
+ this.valueDataType = Preconditions.checkNotNull(valueDataType, "Value data type must not be null.");
+ }
+
+ public KeyValueDataType(
+ LogicalType logicalType,
+ DataType keyDataType,
+ DataType valueDataType) {
+ this(logicalType, null, keyDataType, valueDataType);
+ }
+
+ public DataType getKeyDataType() {
+ return keyDataType;
+ }
+
+ public DataType getValueDataType() {
+ return valueDataType;
+ }
+
+ @Override
+ public DataType notNull() {
+ return new KeyValueDataType(
+ logicalType.copy(false),
+ conversionClass,
+ keyDataType,
+ valueDataType);
+ }
+
+ @Override
+ public DataType nullable() {
+ return new KeyValueDataType(
+ logicalType.copy(true),
+ conversionClass,
+ keyDataType,
+ valueDataType);
+ }
+
+ @Override
+ public DataType bridgedTo(Class<?> newConversionClass) {
+ return new KeyValueDataType(
+ logicalType,
+ Preconditions.checkNotNull(newConversionClass, "New conversion class must not be null."),
+ keyDataType,
+ valueDataType);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ KeyValueDataType that = (KeyValueDataType) o;
+ return keyDataType.equals(that.keyDataType) && valueDataType.equals(that.valueDataType);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), keyDataType, valueDataType);
+ }
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java
new file mode 100644
index 0000000..2475db8
--- /dev/null
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.table.api.ValidationException;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.table.api.DataTypes.ARRAY;
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.CHAR;
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.INTERVAL;
+import static org.apache.flink.table.api.DataTypes.MONTH;
+import static org.apache.flink.table.api.DataTypes.MULTISET;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+import static org.apache.flink.table.api.DataTypes.YEAR;
+import static org.apache.flink.table.types.TypeTestingUtils.hasConversionClass;
+import static org.apache.flink.table.types.TypeTestingUtils.hasNullability;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Test for {@link DataType}.
+ */
+public class DataTypeTest {
+
+ @Test
+ public void testNullability() {
+ assertThat(
+ BIGINT().nullable(),
+ hasNullability(true));
+
+ assertThat(
+ BIGINT().notNull(),
+ hasNullability(false));
+
+ assertThat(
+ BIGINT().notNull().nullable(),
+ hasNullability(true));
+ }
+
+ @Test
+ public void testAtomicConversion() {
+ assertThat(
+ TIMESTAMP(0).bridgedTo(java.sql.Timestamp.class),
+ hasConversionClass(java.sql.Timestamp.class));
+ }
+
+ @Test
+ public void testTolerantAtomicConversion() {
+ // this is logically only supported as input type because of
+ // nullability but is tolerated until the planner complains
+ // about an output type
+ assertThat(
+ BIGINT().nullable().bridgedTo(long.class),
+ hasConversionClass(long.class));
+ }
+
+ @Test(expected = ValidationException.class)
+ public void testInvalidAtomicConversion() {
+ TIMESTAMP(0).bridgedTo(DataTypesTest.class);
+ }
+
+ @Test
+ public void testArrayElementConversion() {
+ assertThat(
+ ARRAY(ARRAY(INT().notNull().bridgedTo(int.class))),
+ hasConversionClass(int[][].class));
+ }
+
+ @Test
+ public void testTolerantArrayConversion() {
+ // this is logically only supported as input type because of
+ // nullability but is tolerated until the planner complains
+ // about an output type
+ assertThat(
+ ARRAY(ARRAY(INT().nullable())).bridgedTo(int[][].class),
+ hasConversionClass(int[][].class));
+ }
+
+ @Test(expected = ValidationException.class)
+ public void testInvalidArrayConversion() {
+ ARRAY(ARRAY(INT())).bridgedTo(int[][][].class);
+ }
+
+ @Test
+ public void testTolerantMapConversion() {
+ // this doesn't make much sense logically but is supported until the planner complains
+ assertThat(
+ MULTISET(MULTISET(INT().bridgedTo(int.class))),
+ hasConversionClass(Map.class));
+ }
+
+ @Test
+ public void testFields() {
+ final DataType rowDataType = ROW(FIELD("field1", CHAR(2)), FIELD("field2", BOOLEAN()));
+
+ final Map<String, DataType> fields = new HashMap<>();
+ fields.put("field1", CHAR(2));
+ fields.put("field2", BOOLEAN());
+ assertEquals(fields, ((FieldsDataType) rowDataType).getFieldDataTypes());
+ }
+
+ @Test(expected = ValidationException.class)
+ public void testInvalidOrderInterval() {
+ INTERVAL(MONTH(), YEAR(2));
+ }
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java
new file mode 100644
index 0000000..a1f4289
--- /dev/null
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.base.VoidSerializer;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.logical.AnyType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.TypeInformationAnyType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType.YearMonthResolution;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.api.DataTypes.ANY;
+import static org.apache.flink.table.api.DataTypes.ARRAY;
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.BINARY;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.BYTES;
+import static org.apache.flink.table.api.DataTypes.CHAR;
+import static org.apache.flink.table.api.DataTypes.DATE;
+import static org.apache.flink.table.api.DataTypes.DECIMAL;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.FLOAT;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.INTERVAL;
+import static org.apache.flink.table.api.DataTypes.MAP;
+import static org.apache.flink.table.api.DataTypes.MINUTE;
+import static org.apache.flink.table.api.DataTypes.MONTH;
+import static org.apache.flink.table.api.DataTypes.MULTISET;
+import static org.apache.flink.table.api.DataTypes.NULL;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.SECOND;
+import static org.apache.flink.table.api.DataTypes.SMALLINT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIME;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_TIME_ZONE;
+import static org.apache.flink.table.api.DataTypes.TINYINT;
+import static org.apache.flink.table.api.DataTypes.VARBINARY;
+import static org.apache.flink.table.api.DataTypes.VARCHAR;
+import static org.apache.flink.table.types.TypeTestingUtils.hasConversionClass;
+import static org.apache.flink.table.types.TypeTestingUtils.hasLogicalType;
+import static org.apache.flink.table.types.logical.DayTimeIntervalType.DEFAULT_DAY_PRECISION;
+import static org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution.MINUTE_TO_SECOND;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link DataTypes}.
+ */
+@RunWith(Parameterized.class)
+public class DataTypesTest {
+
+ @Parameters(name = "{index}: {0}=[Logical: {1}, Class: {2}]")
+ public static List<Object[]> dataTypes() {
+ return Arrays.asList(
+ new Object[][]{
+ {CHAR(2), new CharType(2), String.class},
+
+ {VARCHAR(2), new VarCharType(2), String.class},
+
+ {STRING(), new VarCharType(VarCharType.MAX_LENGTH), String.class},
+
+ {BOOLEAN(), new BooleanType(), Boolean.class},
+
+ {BINARY(42), new BinaryType(42), byte[].class},
+
+ {VARBINARY(42), new VarBinaryType(42), byte[].class},
+
+ {BYTES(), new VarBinaryType(VarBinaryType.MAX_LENGTH), byte[].class},
+
+ {DECIMAL(10, 10), new DecimalType(10, 10), BigDecimal.class},
+
+ {TINYINT(), new TinyIntType(), Byte.class},
+
+ {SMALLINT(), new SmallIntType(), Short.class},
+
+ {INT(), new IntType(), Integer.class},
+
+ {BIGINT(), new BigIntType(), Long.class},
+
+ {FLOAT(), new FloatType(), Float.class},
+
+ {DOUBLE(), new DoubleType(), Double.class},
+
+ {DATE(), new DateType(), java.time.LocalDate.class},
+
+ {TIME(3), new TimeType(3), java.time.LocalTime.class},
+
+ {TIMESTAMP(3), new TimestampType(3), java.time.LocalDateTime.class},
+
+ {TIMESTAMP_WITH_TIME_ZONE(3),
+ new ZonedTimestampType(3),
+ java.time.OffsetDateTime.class},
+
+ {TIMESTAMP_WITH_LOCAL_TIME_ZONE(3),
+ new LocalZonedTimestampType(3),
+ java.time.Instant.class},
+
+ {INTERVAL(MINUTE(), SECOND(3)),
+ new DayTimeIntervalType(MINUTE_TO_SECOND, DEFAULT_DAY_PRECISION, 3),
+ java.time.Duration.class},
+
+ {INTERVAL(MONTH()),
+ new YearMonthIntervalType(YearMonthResolution.MONTH),
+ java.time.Period.class},
+
+ {ARRAY(ARRAY(INT())),
+ new ArrayType(new ArrayType(new IntType())),
+ Integer[][].class},
+
+ {MULTISET(MULTISET(INT())),
+ new MultisetType(new MultisetType(new IntType())),
+ Map.class},
+
+ {MAP(INT(), SMALLINT()),
+ new MapType(new IntType(), new SmallIntType()),
+ Map.class},
+
+ {ROW(FIELD("field1", CHAR(2)), FIELD("field2", BOOLEAN())),
+ new RowType(
+ Arrays.asList(
+ new RowType.RowField("field1", new CharType(2)),
+ new RowType.RowField("field2", new BooleanType()))),
+ Row.class},
+
+ {NULL(), new NullType(), Object.class},
+
+ {ANY(Types.GENERIC(DataTypesTest.class)),
+ new TypeInformationAnyType<>(Types.GENERIC(DataTypesTest.class)),
+ DataTypesTest.class},
+
+ {ANY(Void.class, VoidSerializer.INSTANCE),
+ new AnyType<>(Void.class, VoidSerializer.INSTANCE),
+ Void.class}
+ }
+ );
+ }
+
+ @Parameter
+ public DataType dataType;
+
+ @Parameter(1)
+ public LogicalType expectedLogicalType;
+
+ @Parameter(2)
+ public Class<?> expectedConversionClass;
+
+ @Test
+ public void testLogicalType() {
+ assertThat(dataType, hasLogicalType(expectedLogicalType));
+ }
+
+ @Test
+ public void testConversionClass() {
+ assertThat(dataType, hasConversionClass(expectedConversionClass));
+ }
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/TypeTestingUtils.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/TypeTestingUtils.java
new file mode 100644
index 0000000..16fb4d0
--- /dev/null
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/TypeTestingUtils.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.table.types.logical.LogicalType;
+
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.FeatureMatcher;
+import org.hamcrest.Matcher;
+
+/**
+ * Utilities for testing types.
+ */
+public class TypeTestingUtils {
+
+ public static Matcher<DataType> hasLogicalType(LogicalType logicalType) {
+ return new FeatureMatcher<DataType, LogicalType>(
+ CoreMatchers.equalTo(logicalType),
+ "logical type of the data type",
+ "logical type") {
+
+ @Override
+ protected LogicalType featureValueOf(DataType actual) {
+ return actual.getLogicalType();
+ }
+ };
+ }
+
+ public static Matcher<DataType> hasConversionClass(Class<?> clazz) {
+ return new FeatureMatcher<DataType, Class<?>>(
+ CoreMatchers.equalTo(clazz),
+ "conversion class of the data type",
+ "conversion class") {
+
+ @Override
+ protected Class<?> featureValueOf(DataType actual) {
+ return actual.getConversionClass();
+ }
+ };
+ }
+
+ public static Matcher<DataType> hasNullability(boolean isNullable) {
+ return new FeatureMatcher<DataType, Boolean>(
+ CoreMatchers.equalTo(isNullable),
+ "nullability of the data type",
+ "nullability") {
+
+ @Override
+ protected Boolean featureValueOf(DataType actual) {
+ return actual.getLogicalType().isNullable();
+ }
+ };
+ }
+}