You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/05/09 12:36:22 UTC

[flink] branch master updated (7b3678f -> bb13989)

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 7b3678f  [FLINK-11974][table-planner-blink] Use StreamOperatorFactory in Blink runtime
     new 7ccac27  [hotfix][table-common] Make type parameter constants accessible
     new 1cb9d68  [FLINK-12393][table-common] Add the user-facing classes of the new type system
     new 12f3480  [hotfix][table-common] Add description to user-defined type attributes
     new 0a03f1c  [hotfix][table-common] Validate fields of row type
     new 7bf92f7  [hotfix][table-common] Allow boxed conversions when primitive types are allowed
     new bb13989  [hotfix][table-common] Remove COLLECTION family from MAP type root

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/flink/table/api/DataTypes.java | 740 +++++++++++++++++++++
 .../apache/flink/table/types/AtomicDataType.java   |  64 ++
 .../flink/table/types/CollectionDataType.java      | 113 ++++
 .../org/apache/flink/table/types/DataType.java     | 163 +++++
 .../apache/flink/table/types/FieldsDataType.java   | 106 +++
 .../apache/flink/table/types/KeyValueDataType.java | 113 ++++
 .../flink/table/types/logical/BinaryType.java      |   6 +-
 .../apache/flink/table/types/logical/CharType.java |   6 +-
 .../apache/flink/table/types/logical/DateType.java |   4 +-
 .../table/types/logical/DayTimeIntervalType.java   |  18 +-
 .../flink/table/types/logical/DecimalType.java     |  10 +-
 .../types/logical/LocalZonedTimestampType.java     |  12 +-
 .../flink/table/types/logical/LogicalTypeRoot.java |   1 -
 .../apache/flink/table/types/logical/RowType.java  |  26 +-
 .../flink/table/types/logical/StructuredType.java  |  21 +-
 .../apache/flink/table/types/logical/TimeType.java |  12 +-
 .../flink/table/types/logical/TimestampType.java   |   6 +-
 .../flink/table/types/logical/VarBinaryType.java   |   6 +-
 .../flink/table/types/logical/VarCharType.java     |   6 +-
 .../table/types/logical/YearMonthIntervalType.java |  12 +-
 .../table/types/logical/ZonedTimestampType.java    |   6 +-
 .../org/apache/flink/table/types/DataTypeTest.java | 131 ++++
 .../apache/flink/table/types/DataTypesTest.java    | 211 ++++++
 .../apache/flink/table/types/LogicalTypesTest.java |  23 +-
 .../apache/flink/table/types/TypeTestingUtils.java |  70 ++
 25 files changed, 1835 insertions(+), 51 deletions(-)
 create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java
 create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/AtomicDataType.java
 create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java
 create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java
 create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/FieldsDataType.java
 create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/KeyValueDataType.java
 create mode 100644 flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java
 create mode 100644 flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java
 create mode 100644 flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/TypeTestingUtils.java


[flink] 05/06: [hotfix][table-common] Allow boxed conversions when primitive types are allowed

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7bf92f723c7e5834abffedfd900f2cac4325f13f
Author: Timo Walther <tw...@apache.org>
AuthorDate: Wed May 8 11:12:33 2019 +0200

    [hotfix][table-common] Allow boxed conversions when primitive types are allowed
---
 .../main/java/org/apache/flink/table/types/logical/DateType.java    | 4 +++-
 .../org/apache/flink/table/types/logical/DayTimeIntervalType.java   | 6 +++++-
 .../apache/flink/table/types/logical/LocalZonedTimestampType.java   | 6 +++++-
 .../main/java/org/apache/flink/table/types/logical/TimeType.java    | 6 +++++-
 .../org/apache/flink/table/types/logical/YearMonthIntervalType.java | 6 +++++-
 5 files changed, 23 insertions(+), 5 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DateType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DateType.java
index 6c81a71..fbab0f8 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DateType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DateType.java
@@ -39,11 +39,13 @@ public final class DateType extends LogicalType {
 
 	private static final Set<String> NULL_OUTPUT_CONVERSION = conversionSet(
 		java.sql.Date.class.getName(),
-		java.time.LocalDate.class.getName());
+		java.time.LocalDate.class.getName(),
+		Integer.class.getName());
 
 	private static final Set<String> NOT_NULL_INPUT_OUTPUT_CONVERSION = conversionSet(
 		java.sql.Date.class.getName(),
 		java.time.LocalDate.class.getName(),
+		Integer.class.getName(),
 		int.class.getName());
 
 	private static final Class<?> DEFAULT_CONVERSION = java.time.LocalDate.class;
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DayTimeIntervalType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DayTimeIntervalType.java
index 56656ef..a9eb21c 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DayTimeIntervalType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DayTimeIntervalType.java
@@ -50,6 +50,8 @@ import java.util.Set;
  * default. If no {@code p2} is specified, it is equal to 6 by default.
  *
  * <p>A conversion from and to {@code long} describes the number of milliseconds.
+ *
+ * @see YearMonthIntervalType
  */
 @PublicEvolving
 public final class DayTimeIntervalType extends LogicalType {
@@ -87,10 +89,12 @@ public final class DayTimeIntervalType extends LogicalType {
 	private static final String SECOND_FORMAT = "INTERVAL SECOND(%2$d)";
 
 	private static final Set<String> NULL_OUTPUT_CONVERSION = conversionSet(
-		java.time.Duration.class.getName());
+		java.time.Duration.class.getName(),
+		Long.class.getName());
 
 	private static final Set<String> NOT_NULL_INPUT_OUTPUT_CONVERSION = conversionSet(
 		java.time.Duration.class.getName(),
+		Long.class.getName(),
 		long.class.getName());
 
 	private static final Class<?> DEFAULT_CONVERSION = java.time.Duration.class;
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LocalZonedTimestampType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LocalZonedTimestampType.java
index 3c7d39f..8352d47 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LocalZonedTimestampType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LocalZonedTimestampType.java
@@ -62,11 +62,15 @@ public final class LocalZonedTimestampType extends LogicalType {
 	private static final String FORMAT = "TIMESTAMP(%d) WITH LOCAL TIME ZONE";
 
 	private static final Set<String> NULL_OUTPUT_CONVERSION = conversionSet(
-		java.time.Instant.class.getName());
+		java.time.Instant.class.getName(),
+		Integer.class.getName(),
+		Long.class.getName());
 
 	private static final Set<String> NOT_NULL_INPUT_OUTPUT_CONVERSION = conversionSet(
 		java.time.Instant.class.getName(),
+		Integer.class.getName(),
 		int.class.getName(),
+		Long.class.getName(),
 		long.class.getName());
 
 	private static final Class<?> DEFAULT_CONVERSION = java.time.Instant.class;
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TimeType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TimeType.java
index 3e8e7c1..f55ddf3 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TimeType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TimeType.java
@@ -53,12 +53,16 @@ public final class TimeType extends LogicalType {
 
 	private static final Set<String> NULL_OUTPUT_CONVERSION = conversionSet(
 		java.sql.Time.class.getName(),
-		java.time.LocalTime.class.getName());
+		java.time.LocalTime.class.getName(),
+		Integer.class.getName(),
+		Long.class.getName());
 
 	private static final Set<String> NOT_NULL_INPUT_OUTPUT_CONVERSION = conversionSet(
 		java.sql.Time.class.getName(),
 		java.time.LocalTime.class.getName(),
+		Integer.class.getName(),
 		int.class.getName(),
+		Long.class.getName(),
 		long.class.getName());
 
 	private static final Class<?> DEFAULT_CONVERSION = java.time.LocalTime.class;
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/YearMonthIntervalType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/YearMonthIntervalType.java
index c28f920..8891398 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/YearMonthIntervalType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/YearMonthIntervalType.java
@@ -43,6 +43,8 @@ import java.util.Set;
  *
  * <p>A conversion from and to {@code int} describes the number of months. A conversion from
  * {@link java.time.Period} ignores the {@code days} part.
+ *
+ * @see DayTimeIntervalType
  */
 @PublicEvolving
 public final class YearMonthIntervalType extends LogicalType {
@@ -60,10 +62,12 @@ public final class YearMonthIntervalType extends LogicalType {
 	private static final String MONTH_FORMAT = "INTERVAL MONTH";
 
 	private static final Set<String> NULL_OUTPUT_CONVERSION = conversionSet(
-		java.time.Period.class.getName());
+		java.time.Period.class.getName(),
+		Integer.class.getName());
 
 	private static final Set<String> NOT_NULL_INPUT_OUTPUT_CONVERSION = conversionSet(
 		java.time.Period.class.getName(),
+		Integer.class.getName(),
 		int.class.getName());
 
 	private static final Class<?> DEFAULT_CONVERSION = java.time.Period.class;


[flink] 03/06: [hotfix][table-common] Add description to user-defined type attributes

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 12f34808fd87816726a5a8ce74146bd07265c1c3
Author: Timo Walther <tw...@apache.org>
AuthorDate: Tue May 7 14:55:25 2019 +0200

    [hotfix][table-common] Add description to user-defined type attributes
---
 .../flink/table/types/logical/StructuredType.java   | 21 +++++++++++++++++----
 .../apache/flink/table/types/LogicalTypesTest.java  |  2 +-
 2 files changed, 18 insertions(+), 5 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/StructuredType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/StructuredType.java
index f76ad6b..44bd0ad 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/StructuredType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/StructuredType.java
@@ -71,9 +71,16 @@ public final class StructuredType extends UserDefinedType {
 
 		private final LogicalType type;
 
-		public StructuredAttribute(String name, LogicalType type) {
+		private final @Nullable String description;
+
+		public StructuredAttribute(String name, LogicalType type, @Nullable String description) {
 			this.name = Preconditions.checkNotNull(name, "Attribute name must not be null.");
 			this.type = Preconditions.checkNotNull(type, "Attribute type must not be null.");
+			this.description = description;
+		}
+
+		public StructuredAttribute(String name, LogicalType type) {
+			this(name, type, null);
 		}
 
 		public String getName() {
@@ -84,8 +91,12 @@ public final class StructuredType extends UserDefinedType {
 			return type;
 		}
 
+		public Optional<String> getDescription() {
+			return Optional.ofNullable(description);
+		}
+
 		public StructuredAttribute copy() {
-			return new StructuredAttribute(name, type.copy());
+			return new StructuredAttribute(name, type.copy(), description);
 		}
 
 		@Override
@@ -97,12 +108,14 @@ public final class StructuredType extends UserDefinedType {
 				return false;
 			}
 			StructuredAttribute that = (StructuredAttribute) o;
-			return name.equals(that.name) && type.equals(that.type);
+			return name.equals(that.name) &&
+				type.equals(that.type) &&
+				Objects.equals(description, that.description);
 		}
 
 		@Override
 		public int hashCode() {
-			return Objects.hash(name, type);
+			return Objects.hash(name, type, description);
 		}
 	}
 
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
index e03e15d..b8b38c5 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
@@ -602,7 +602,7 @@ public class LogicalTypesTest {
 		return new StructuredType.Builder(
 				new UserDefinedType.TypeIdentifier("cat", "db", "Human"),
 				Collections.singletonList(
-					new StructuredType.StructuredAttribute("name", UDT_NAME_TYPE)))
+					new StructuredType.StructuredAttribute("name", UDT_NAME_TYPE, "Description.")))
 			.setDescription("Human type desc.")
 			.setFinal(false)
 			.setInstantiable(false)


[flink] 04/06: [hotfix][table-common] Validate fields of row type

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0a03f1c49c97ae428db446043670b7dfea48d307
Author: Timo Walther <tw...@apache.org>
AuthorDate: Tue May 7 15:34:24 2019 +0200

    [hotfix][table-common] Validate fields of row type
---
 .../apache/flink/table/types/logical/RowType.java  | 26 ++++++++++++++++++++--
 .../apache/flink/table/types/LogicalTypesTest.java | 21 +++++++++++++++++
 2 files changed, 45 insertions(+), 2 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/RowType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/RowType.java
index 24630f0..3f6147e 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/RowType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/RowType.java
@@ -19,8 +19,10 @@
 package org.apache.flink.table.types.logical;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
 
 import javax.annotation.Nullable;
 
@@ -41,8 +43,8 @@ import java.util.stream.Collectors;
  * complex structures.
  *
  * <p>The serialized string representation is {@code ROW<n0 t0 'd0', n1 t1 'd1', ...>} where
- * {@code n} is the name of a field, {@code t} is the logical type of a field, {@code d} is the description
- * of a field.
+ * {@code n} is the unique name of a field, {@code t} is the logical type of a field, {@code d} is
+ * the description of a field.
  */
 @PublicEvolving
 public final class RowType extends LogicalType {
@@ -149,6 +151,8 @@ public final class RowType extends LogicalType {
 		this.fields = Collections.unmodifiableList(
 			new ArrayList<>(
 				Preconditions.checkNotNull(fields, "Fields must not be null.")));
+
+		validateFields(fields);
 	}
 
 	public RowType(List<RowField> fields) {
@@ -231,4 +235,22 @@ public final class RowType extends LogicalType {
 	public int hashCode() {
 		return Objects.hash(super.hashCode(), fields);
 	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private static void validateFields(List<RowField> fields) {
+		final List<String> fieldNames = fields.stream()
+			.map(f -> f.name)
+			.collect(Collectors.toList());
+		if (fieldNames.stream().anyMatch(StringUtils::isNullOrWhitespaceOnly)) {
+			throw new ValidationException("Field names must contain at least one non-whitespace character.");
+		}
+		final Set<String> duplicates = fieldNames.stream()
+			.filter(n -> Collections.frequency(fieldNames, n) > 1)
+			.collect(Collectors.toSet());
+		if (!duplicates.isEmpty()) {
+			throw new ValidationException(
+				String.format("Field names must be unique. Found duplicates: %s", duplicates));
+		}
+	}
 }
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
index b8b38c5..60f37b2 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.types.logical.AnyType;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.BigIntType;
@@ -68,6 +69,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Test for subclasses of {@link org.apache.flink.table.types.logical.LogicalType}.
@@ -401,6 +403,25 @@ public class LogicalTypesTest {
 					new RowType.RowField("a", new VarCharType(), "Different desc."),
 					new RowType.RowField("b`", new TimestampType())))
 		);
+
+		try {
+			new RowType(
+				Arrays.asList(
+					new RowType.RowField("b", new VarCharType()),
+					new RowType.RowField("b", new VarCharType()),
+					new RowType.RowField("a", new VarCharType()),
+					new RowType.RowField("a", new TimestampType())));
+			fail("Not unique fields expected.");
+		} catch (ValidationException e) {
+			// ok
+		}
+
+		try {
+			new RowType(Collections.singletonList(new RowType.RowField("", new VarCharType())));
+			fail("Invalid name.");
+		} catch (ValidationException e) {
+			// ok
+		}
 	}
 
 	@Test


[flink] 01/06: [hotfix][table-common] Make type parameter constants accessible

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7ccac275e3104e7ad6b008b4f75fedcfbf57bc1a
Author: Timo Walther <tw...@apache.org>
AuthorDate: Tue May 7 14:09:43 2019 +0200

    [hotfix][table-common] Make type parameter constants accessible
---
 .../org/apache/flink/table/types/logical/BinaryType.java     |  6 +++---
 .../java/org/apache/flink/table/types/logical/CharType.java  |  6 +++---
 .../flink/table/types/logical/DayTimeIntervalType.java       | 12 ++++++------
 .../org/apache/flink/table/types/logical/DecimalType.java    | 10 +++++-----
 .../flink/table/types/logical/LocalZonedTimestampType.java   |  6 +++---
 .../java/org/apache/flink/table/types/logical/TimeType.java  |  6 +++---
 .../org/apache/flink/table/types/logical/TimestampType.java  |  6 +++---
 .../org/apache/flink/table/types/logical/VarBinaryType.java  |  6 +++---
 .../org/apache/flink/table/types/logical/VarCharType.java    |  6 +++---
 .../flink/table/types/logical/YearMonthIntervalType.java     |  6 +++---
 .../apache/flink/table/types/logical/ZonedTimestampType.java |  6 +++---
 11 files changed, 38 insertions(+), 38 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/BinaryType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/BinaryType.java
index 2ae1af2..e342d9e 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/BinaryType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/BinaryType.java
@@ -36,11 +36,11 @@ import java.util.Set;
 @PublicEvolving
 public final class BinaryType extends LogicalType {
 
-	private static final int MIN_LENGTH = 1;
+	public static final int MIN_LENGTH = 1;
 
-	private static final int MAX_LENGTH = Integer.MAX_VALUE;
+	public static final int MAX_LENGTH = Integer.MAX_VALUE;
 
-	private static final int DEFAULT_LENGTH = 1;
+	public static final int DEFAULT_LENGTH = 1;
 
 	private static final String FORMAT = "BINARY(%d)";
 
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/CharType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/CharType.java
index 1168370..fe870ce 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/CharType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/CharType.java
@@ -38,11 +38,11 @@ import java.util.Set;
 @PublicEvolving
 public final class CharType extends LogicalType {
 
-	private static final int MIN_LENGTH = 1;
+	public static final int MIN_LENGTH = 1;
 
-	private static final int MAX_LENGTH = 255;
+	public static final int MAX_LENGTH = 255;
 
-	private static final int DEFAULT_LENGTH = 1;
+	public static final int DEFAULT_LENGTH = 1;
 
 	private static final String FORMAT = "CHAR(%d)";
 
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DayTimeIntervalType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DayTimeIntervalType.java
index ca5b3168..56656ef 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DayTimeIntervalType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DayTimeIntervalType.java
@@ -54,17 +54,17 @@ import java.util.Set;
 @PublicEvolving
 public final class DayTimeIntervalType extends LogicalType {
 
-	private static final int MIN_DAY_PRECISION = 1;
+	public static final int MIN_DAY_PRECISION = 1;
 
-	private static final int MAX_DAY_PRECISION = 6;
+	public static final int MAX_DAY_PRECISION = 6;
 
-	private static final int DEFAULT_DAY_PRECISION = 2;
+	public static final int DEFAULT_DAY_PRECISION = 2;
 
-	private static final int MIN_FRACTIONAL_PRECISION = 0;
+	public static final int MIN_FRACTIONAL_PRECISION = 0;
 
-	private static final int MAX_FRACTIONAL_PRECISION = 9;
+	public static final int MAX_FRACTIONAL_PRECISION = 9;
 
-	private static final int DEFAULT_FRACTIONAL_PRECISION = 6;
+	public static final int DEFAULT_FRACTIONAL_PRECISION = 6;
 
 	private static final String DAY_FORMAT = "INTERVAL DAY(%1$d)";
 
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DecimalType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DecimalType.java
index 017bcd4..c11ede0 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DecimalType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DecimalType.java
@@ -39,15 +39,15 @@ import java.util.Set;
 @PublicEvolving
 public final class DecimalType extends LogicalType {
 
-	private static final int MIN_PRECISION = 1;
+	public static final int MIN_PRECISION = 1;
 
-	private static final int MAX_PRECISION = 38;
+	public static final int MAX_PRECISION = 38;
 
-	private static final int DEFAULT_PRECISION = 10;
+	public static final int DEFAULT_PRECISION = 10;
 
-	private static final int MIN_SCALE = 0;
+	public static final int MIN_SCALE = 0;
 
-	private static final int DEFAULT_SCALE = 0;
+	public static final int DEFAULT_SCALE = 0;
 
 	private static final String FORMAT = "DECIMAL(%d, %d)";
 
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LocalZonedTimestampType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LocalZonedTimestampType.java
index 43934d5..3c7d39f 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LocalZonedTimestampType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LocalZonedTimestampType.java
@@ -53,11 +53,11 @@ import java.util.Set;
 @PublicEvolving
 public final class LocalZonedTimestampType extends LogicalType {
 
-	private static final int MIN_PRECISION = 0;
+	public static final int MIN_PRECISION = 0;
 
-	private static final int MAX_PRECISION = 9;
+	public static final int MAX_PRECISION = 9;
 
-	private static final int DEFAULT_PRECISION = 6;
+	public static final int DEFAULT_PRECISION = 6;
 
 	private static final String FORMAT = "TIMESTAMP(%d) WITH LOCAL TIME ZONE";
 
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TimeType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TimeType.java
index f1e685c..3e8e7c1 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TimeType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TimeType.java
@@ -43,11 +43,11 @@ import java.util.Set;
 @PublicEvolving
 public final class TimeType extends LogicalType {
 
-	private static final int MIN_PRECISION = 0;
+	public static final int MIN_PRECISION = 0;
 
-	private static final int MAX_PRECISION = 9;
+	public static final int MAX_PRECISION = 9;
 
-	private static final int DEFAULT_PRECISION = 0;
+	public static final int DEFAULT_PRECISION = 0;
 
 	private static final String FORMAT = "TIME(%d)";
 
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TimestampType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TimestampType.java
index b2b2d64..8092726 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TimestampType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TimestampType.java
@@ -47,11 +47,11 @@ import java.util.Set;
 @PublicEvolving
 public final class TimestampType extends LogicalType {
 
-	private static final int MIN_PRECISION = 0;
+	public static final int MIN_PRECISION = 0;
 
-	private static final int MAX_PRECISION = 9;
+	public static final int MAX_PRECISION = 9;
 
-	private static final int DEFAULT_PRECISION = 6;
+	public static final int DEFAULT_PRECISION = 6;
 
 	private static final String FORMAT = "TIMESTAMP(%d)";
 
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/VarBinaryType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/VarBinaryType.java
index 7ed6b5a..8322518 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/VarBinaryType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/VarBinaryType.java
@@ -36,11 +36,11 @@ import java.util.Set;
 @PublicEvolving
 public final class VarBinaryType extends LogicalType {
 
-	private static final int MIN_LENGTH = 1;
+	public static final int MIN_LENGTH = 1;
 
-	private static final int MAX_LENGTH = Integer.MAX_VALUE;
+	public static final int MAX_LENGTH = Integer.MAX_VALUE;
 
-	private static final int DEFAULT_LENGTH = 1;
+	public static final int DEFAULT_LENGTH = 1;
 
 	private static final String FORMAT = "VARBINARY(%d)";
 
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/VarCharType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/VarCharType.java
index c955c0a..4092558 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/VarCharType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/VarCharType.java
@@ -38,11 +38,11 @@ import java.util.Set;
 @PublicEvolving
 public final class VarCharType extends LogicalType {
 
-	private static final int MIN_LENGTH = 1;
+	public static final int MIN_LENGTH = 1;
 
-	private static final int MAX_LENGTH = Integer.MAX_VALUE;
+	public static final int MAX_LENGTH = Integer.MAX_VALUE;
 
-	private static final int DEFAULT_LENGTH = 1;
+	public static final int DEFAULT_LENGTH = 1;
 
 	private static final String FORMAT = "VARCHAR(%d)";
 
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/YearMonthIntervalType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/YearMonthIntervalType.java
index cfe7952..c28f920 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/YearMonthIntervalType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/YearMonthIntervalType.java
@@ -47,11 +47,11 @@ import java.util.Set;
 @PublicEvolving
 public final class YearMonthIntervalType extends LogicalType {
 
-	private static final int MIN_PRECISION = 1;
+	public static final int MIN_PRECISION = 1;
 
-	private static final int MAX_PRECISION = 4;
+	public static final int MAX_PRECISION = 4;
 
-	private static final int DEFAULT_PRECISION = 2;
+	public static final int DEFAULT_PRECISION = 2;
 
 	private static final String YEAR_FORMAT = "INTERVAL YEAR(%d)";
 
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/ZonedTimestampType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/ZonedTimestampType.java
index b005472..4230bb8 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/ZonedTimestampType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/ZonedTimestampType.java
@@ -49,11 +49,11 @@ import java.util.Set;
 @PublicEvolving
 public final class ZonedTimestampType extends LogicalType {
 
-	private static final int MIN_PRECISION = 0;
+	public static final int MIN_PRECISION = 0;
 
-	private static final int MAX_PRECISION = 9;
+	public static final int MAX_PRECISION = 9;
 
-	private static final int DEFAULT_PRECISION = 6;
+	public static final int DEFAULT_PRECISION = 6;
 
 	private static final String FORMAT = "TIMESTAMP(%d) WITH TIME ZONE";
 


[flink] 02/06: [FLINK-12393][table-common] Add the user-facing classes of the new type system

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1cb9d680cbbc46e67de10acc99c5fb510b9382e5
Author: Timo Walther <tw...@apache.org>
AuthorDate: Fri May 3 13:52:23 2019 +0200

    [FLINK-12393][table-common] Add the user-facing classes of the new type system
    
    Introduces the DataType class and subclasses. Users are able to do a star import
    of DataTypes and declare types like: `MULTISET(MULTISET(INT())`. Close to SQL. As
    mentioned in FLIP-37, data types allow to specify format hints to the planner
    using `TIMESTAMP(9).bridgedTo(java.sql.Timestamp)`.
    
    This closes #8360.
---
 .../java/org/apache/flink/table/api/DataTypes.java | 740 +++++++++++++++++++++
 .../apache/flink/table/types/AtomicDataType.java   |  64 ++
 .../flink/table/types/CollectionDataType.java      | 113 ++++
 .../org/apache/flink/table/types/DataType.java     | 163 +++++
 .../apache/flink/table/types/FieldsDataType.java   | 106 +++
 .../apache/flink/table/types/KeyValueDataType.java | 113 ++++
 .../org/apache/flink/table/types/DataTypeTest.java | 131 ++++
 .../apache/flink/table/types/DataTypesTest.java    | 211 ++++++
 .../apache/flink/table/types/TypeTestingUtils.java |  70 ++
 9 files changed, 1711 insertions(+)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java
new file mode 100644
index 0000000..2591227
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java
@@ -0,0 +1,740 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.FieldsDataType;
+import org.apache.flink.table.types.KeyValueDataType;
+import org.apache.flink.table.types.logical.AnyType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.TypeInformationAnyType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType.YearMonthResolution;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A {@link DataType} can be used to declare input and/or output types of operations. This class
+ * enumerates all supported data types of the Table & SQL API.
+ */
+@PublicEvolving
+public final class DataTypes {
+
+	// we use SQL-like naming for data types and avoid Java keyword clashes
+	// CHECKSTYLE.OFF: MethodName
+
+	/**
+	 * Data type of a fixed-length character string {@code CHAR(n)} where {@code n} is the number
+	 * of code points. {@code n} must have a value between 1 and 255 (both inclusive).
+	 *
+	 * @see CharType
+	 */
+	public static DataType CHAR(int n) {
+		return new AtomicDataType(new CharType(n));
+	}
+
+	/**
+	 * Data type of a variable-length character string {@code VARCHAR(n)} where {@code n} is the
+	 * maximum number of code points. {@code n} must have a value between 1 and {@link Integer#MAX_VALUE}
+	 * (both inclusive).
+	 *
+	 * @see VarCharType
+	 */
+	public static DataType VARCHAR(int n) {
+		return new AtomicDataType(new VarCharType(n));
+	}
+
+	/**
+	 * Data type of a variable-length character string with defined maximum length. This is a shortcut
+	 * for {@code VARCHAR(2147483647)} for representing JVM strings.
+	 *
+	 * @see VarCharType
+	 */
+	public static DataType STRING() {
+		return VARCHAR(Integer.MAX_VALUE);
+	}
+
+	/**
+	 * Data type of a boolean with a (possibly) three-valued logic of {@code TRUE, FALSE, UNKNOWN}.
+	 *
+	 * @see BooleanType
+	 */
+	public static DataType BOOLEAN() {
+		return new AtomicDataType(new BooleanType());
+	}
+
+	/**
+	 * Data type of a fixed-length binary string (=a sequence of bytes) {@code BINARY(n)} where
+	 * {@code n} is the number of bytes. {@code n} must have a value between 1 and {@link Integer#MAX_VALUE}
+	 * (both inclusive).
+	 *
+	 * @see BinaryType
+	 */
+	public static DataType BINARY(int n) {
+		return new AtomicDataType(new BinaryType(n));
+	}
+
+	/**
+	 * Data type of a variable-length binary string (=a sequence of bytes) {@code VARBINARY(n)} where
+	 * {@code n} is the maximum number of bytes. {@code n} must have a value between 1 and {@link Integer#MAX_VALUE}
+	 * (both inclusive).
+	 *
+	 * @see VarBinaryType
+	 */
+	public static DataType VARBINARY(int n) {
+		return new AtomicDataType(new VarBinaryType(n));
+	}
+
+	/**
+	 * Data type of a variable-length binary string (=a sequence of bytes) with defined maximum length.
+	 * This is a shortcut for {@code VARBINARY(2147483647)} for representing JVM byte arrays.
+	 *
+	 * @see VarBinaryType
+	 */
+	public static DataType BYTES() {
+		return VARBINARY(Integer.MAX_VALUE);
+	}
+
+	/**
+	 * Data type of a decimal number with fixed precision and scale {@code DECIMAL(p, s)} where {@code p}
+	 * is the number of digits in a number (=precision) and {@code s} is the number of digits to the
+	 * right of the decimal point in a number (=scale). {@code p} must have a value between 1 and 38
+	 * (both inclusive). {@code s} must have a value between 0 and {@code p} (both inclusive).
+	 *
+	 * @see DecimalType
+	 */
+	public static DataType DECIMAL(int precision, int scale) {
+		return new AtomicDataType(new DecimalType(precision, scale));
+	}
+
+	/**
+	 * Data type of a 1-byte signed integer with values from -128 to 127.
+	 *
+	 * @see TinyIntType
+	 */
+	public static DataType TINYINT() {
+		return new AtomicDataType(new TinyIntType());
+	}
+
+	/**
+	 * Data type of a 2-byte signed integer with values from -32,768 to 32,767.
+	 *
+	 * @see SmallIntType
+	 */
+	public static DataType SMALLINT() {
+		return new AtomicDataType(new SmallIntType());
+	}
+
+	/**
+	 * Data type of a 4-byte signed integer with values from -2,147,483,648 to 2,147,483,647.
+	 *
+	 * @see IntType
+	 */
+	public static DataType INT() {
+		return new AtomicDataType(new IntType());
+	}
+
+	/**
+	 * Data type of an 8-byte signed integer with values from -9,223,372,036,854,775,808 to
+	 * 9,223,372,036,854,775,807.
+	 *
+	 * @see BigIntType
+	 */
+	public static DataType BIGINT() {
+		return new AtomicDataType(new BigIntType());
+	}
+
+	/**
+	 * Data type of a 4-byte single precision floating point number.
+	 *
+	 * @see FloatType
+	 */
+	public static DataType FLOAT() {
+		return new AtomicDataType(new FloatType());
+	}
+
+	/**
+	 * Data type of an 8-byte double precision floating point number.
+	 *
+	 * @see DoubleType
+	 */
+	public static DataType DOUBLE() {
+		return new AtomicDataType(new DoubleType());
+	}
+
+	/**
+	 * Data type of a date consisting of {@code year-month-day} with values ranging from {@code 0000-01-01}
+	 * to {@code 9999-12-31}.
+	 *
+	 * <p>Compared to the SQL standard, the range starts at year {@code 0000}.
+	 *
+	 * @see DataType
+	 */
+	public static DataType DATE() {
+		return new AtomicDataType(new DateType());
+	}
+
+	/**
+	 * Data type of a time WITHOUT time zone {@code TIME(p)} where {@code p} is the number of digits
+	 * of fractional seconds (=precision). {@code p} must have a value between 0 and 9 (both inclusive).
+	 *
+	 * <p>An instance consists of {@code hour:minute:second[.fractional]} with up to nanosecond precision
+	 * and values ranging from {@code 00:00:00.000000000} to {@code 23:59:59.999999999}.
+	 *
+	 * <p>Compared to the SQL standard, leap seconds (23:59:60 and 23:59:61) are not supported as the
+	 * semantics are closer to {@link java.time.LocalTime}. A time WITH time zone is not provided.
+	 *
+	 * @see TimeType
+	 */
+	public static DataType TIME(int precision) {
+		return new AtomicDataType(new TimeType(precision));
+	}
+
+	/**
+	 * Data type of a timestamp WITHOUT time zone {@code TIMESTAMP(p)} where {@code p} is the number
+	 * of digits of fractional seconds (=precision). {@code p} must have a value between 0 and 9 (both
+	 * inclusive).
+	 *
+	 * <p>An instance consists of {@code year-month-day hour:minute:second[.fractional]} with up to
+	 * nanosecond precision and values ranging from {@code 0000-01-01 00:00:00.000000000} to
+	 * {@code 9999-12-31 23:59:59.999999999}.
+	 *
+	 * <p>Compared to the SQL standard, leap seconds (23:59:60 and 23:59:61) are not supported as the
+	 * semantics are closer to {@link java.time.LocalDateTime}.
+	 *
+	 * @see #TIMESTAMP_WITH_TIME_ZONE(int)
+	 * @see #TIMESTAMP_WITH_LOCAL_TIME_ZONE(int)
+	 * @see TimestampType
+	 */
+	public static DataType TIMESTAMP(int precision) {
+		return new AtomicDataType(new TimestampType(precision));
+	}
+
+	/**
+	 * Data type of a timestamp WITH time zone {@code TIMESTAMP(p) WITH TIME ZONE} where {@code p} is
+	 * the number of digits of fractional seconds (=precision). {@code p} must have a value between 0
+	 * and 9 (both inclusive).
+	 *
+	 * <p>An instance consists of {@code year-month-day hour:minute:second[.fractional] zone} with up
+	 * to nanosecond precision and values ranging from {@code 0000-01-01 00:00:00.000000000 +14:59} to
+	 * {@code 9999-12-31 23:59:59.999999999 -14:59}.
+	 *
+	 * <p>Compared to the SQL standard, leap seconds (23:59:60 and 23:59:61) are not supported as the
+	 * semantics are closer to {@link java.time.OffsetDateTime}.
+	 *
+	 * @see #TIMESTAMP(int)
+	 * @see #TIMESTAMP_WITH_LOCAL_TIME_ZONE(int)
+	 * @see ZonedTimestampType
+	 */
+	public static DataType TIMESTAMP_WITH_TIME_ZONE(int precision) {
+		return new AtomicDataType(new ZonedTimestampType(precision));
+	}
+
+	/**
+	 * Data type of a timestamp WITH LOCAL time zone {@code TIMESTAMP(p) WITH LOCAL TIME ZONE} where
+	 * {@code p} is the number of digits of fractional seconds (=precision). {@code p} must have a value
+	 * between 0 and 9 (both inclusive).
+	 *
+	 * <p>An instance consists of {@code year-month-day hour:minute:second[.fractional] zone} with up
+	 * to nanosecond precision and values ranging from {@code 0000-01-01 00:00:00.000000000 +14:59} to
+	 * {@code 9999-12-31 23:59:59.999999999 -14:59}. Leap seconds (23:59:60 and 23:59:61) are not supported
+	 * as the semantics are closer to {@link java.time.OffsetDateTime}.
+	 *
+	 * <p>Compared to {@link ZonedTimestampType}, the time zone offset information is not stored physically
+	 * in every datum. Instead, the type assumes {@link java.time.Instant} semantics in UTC time zone
+	 * at the edges of the table ecosystem. Every datum is interpreted in the local time zone configured
+	 * in the current session for computation and visualization.
+	 *
+	 * <p>This type fills the gap between time zone free and time zone mandatory timestamp types by
+	 * allowing the interpretation of UTC timestamps according to the configured session timezone.
+	 *
+	 * @see #TIMESTAMP(int)
+	 * @see #TIMESTAMP_WITH_TIME_ZONE(int)
+	 * @see LocalZonedTimestampType
+	 */
+	public static DataType TIMESTAMP_WITH_LOCAL_TIME_ZONE(int precision) {
+		return new AtomicDataType(new LocalZonedTimestampType(precision));
+	}
+
+	/**
+	 * Data type of a temporal interval. There are two types of temporal intervals: day-time intervals
+	 * with up to nanosecond granularity or year-month intervals with up to month granularity.
+	 *
+	 * <p>An interval of day-time consists of {@code +days hours:months:seconds.fractional} with values
+	 * ranging from {@code -999999 23:59:59.999999999} to {@code +999999 23:59:59.999999999}. The type
+	 * must be parameterized to one of the following resolutions: interval of days, interval of days to
+	 * hours, interval of days to minutes, interval of days to seconds, interval of hours, interval of
+	 * hours to minutes, interval of hours to seconds, interval of minutes, interval of minutes to seconds,
+	 * or interval of seconds. The value representation is the same for all types of resolutions. For
+	 * example, an interval of seconds of 70 is always represented in an interval-of-days-to-seconds
+	 * format (with default precisions): {@code +00 00:01:10.000000}).
+	 *
+	 * <p>An interval of year-month consists of {@code +years-months} with values ranging from {@code -9999-11}
+	 * to {@code +9999-11}. The type must be parameterized to one of the following resolutions: interval
+	 * of years, interval of years to months, or interval of months. The value representation is the
+	 * same for all types of resolutions. For example, an interval of months of 50 is always represented
+	 * in an interval-of-years-to-months format (with default year precision): {@code +04-02}.
+	 *
+	 * <p>Examples: {@code INTERVAL(DAY(2))} for a day-time interval or {@code INTERVAL(YEAR(4))} for
+	 * a year-month interval.
+	 *
+	 * @see DayTimeIntervalType
+	 * @see YearMonthIntervalType
+	 */
+	public static DataType INTERVAL(Resolution resolution) {
+		Preconditions.checkNotNull(resolution, "Interval resolution must not be null.");
+		return new AtomicDataType(Resolution.resolveInterval(resolution, null));
+	}
+
+	/**
+	 * Data type of a temporal interval. There are two types of temporal intervals: day-time intervals
+	 * with up to nanosecond granularity or year-month intervals with up to month granularity.
+	 *
+	 * <p>An interval of day-time consists of {@code +days hours:months:seconds.fractional} with values
+	 * ranging from {@code -999999 23:59:59.999999999} to {@code +999999 23:59:59.999999999}. The type
+	 * must be parameterized to one of the following resolutions: interval of days, interval of days to
+	 * hours, interval of days to minutes, interval of days to seconds, interval of hours, interval of
+	 * hours to minutes, interval of hours to seconds, interval of minutes, interval of minutes to seconds,
+	 * or interval of seconds. The value representation is the same for all types of resolutions. For
+	 * example, an interval of seconds of 70 is always represented in an interval-of-days-to-seconds
+	 * format (with default precisions): {@code +00 00:01:10.000000}).
+	 *
+	 * <p>An interval of year-month consists of {@code +years-months} with values ranging from {@code -9999-11}
+	 * to {@code +9999-11}. The type must be parameterized to one of the following resolutions: interval
+	 * of years, interval of years to months, or interval of months. The value representation is the
+	 * same for all types of resolutions. For example, an interval of months of 50 is always represented
+	 * in an interval-of-years-to-months format (with default year precision): {@code +04-02}.
+	 *
+	 * <p>Examples: {@code INTERVAL(DAY(2), SECOND(9))} for a day-time interval or {@code INTERVAL(YEAR(4), MONTH())}
+	 * for a year-month interval.
+	 *
+	 * @see DayTimeIntervalType
+	 * @see YearMonthIntervalType
+	 */
+	public static DataType INTERVAL(Resolution upperResolution, Resolution lowerResolution) {
+		Preconditions.checkNotNull(upperResolution, "Upper interval resolution must not be null.");
+		Preconditions.checkNotNull(lowerResolution, "Lower interval resolution must not be null.");
+		return new AtomicDataType(Resolution.resolveInterval(upperResolution, lowerResolution));
+	}
+
+	/**
+	 * Data type of an array of elements with same subtype.
+	 *
+	 * <p>Compared to the SQL standard, the maximum cardinality of an array cannot be specified but
+	 * is fixed at {@link Integer#MAX_VALUE}. Also, any valid type is supported as a subtype.
+	 *
+	 * @see ArrayType
+	 */
+	public static DataType ARRAY(DataType elementDataType) {
+		Preconditions.checkNotNull(elementDataType, "Element data type must not be null.");
+		return new CollectionDataType(new ArrayType(elementDataType.getLogicalType()), elementDataType);
+	}
+
+	/**
+	 * Data type of a multiset (=bag). Unlike a set, it allows for multiple instances for each of its
+	 * elements with a common subtype. Each unique value (including {@code NULL}) is mapped to some
+	 * multiplicity.
+	 *
+	 * <p>There is no restriction of element types; it is the responsibility of the user to ensure
+	 * uniqueness.
+	 *
+	 * @see MultisetType
+	 */
+	public static DataType MULTISET(DataType elementDataType) {
+		Preconditions.checkNotNull(elementDataType, "Element data type must not be null.");
+		return new CollectionDataType(new MultisetType(elementDataType.getLogicalType()), elementDataType);
+	}
+
+	/**
+	 * Data type of an associative array that maps keys (including {@code NULL}) to values (including
+	 * {@code NULL}). A map cannot contain duplicate keys; each key can map to at most one value.
+	 *
+	 * <p>There is no restriction of key types; it is the responsibility of the user to ensure uniqueness.
+	 * The map type is an extension to the SQL standard.
+	 *
+	 * @see MapType
+	 */
+	public static DataType MAP(DataType keyDataType, DataType valueDataType) {
+		Preconditions.checkNotNull(keyDataType, "Key data type must not be null.");
+		Preconditions.checkNotNull(valueDataType, "Value data type must not be null.");
+		return new KeyValueDataType(
+			new MapType(keyDataType.getLogicalType(), valueDataType.getLogicalType()),
+			keyDataType,
+			valueDataType);
+	}
+
+	/**
+	 * Data type of a sequence of fields. A field consists of a field name, field type, and an optional
+	 * description. The most specific type of a row of a table is a row type. In this case, each column
+	 * of the row corresponds to the field of the row type that has the same ordinal position as the
+	 * column.
+	 *
+	 * <p>Compared to the SQL standard, an optional field description simplifies the handling with
+	 * complex structures.
+	 *
+	 * @see RowType
+	 */
+	public static DataType ROW(Field... fields) {
+		final List<RowType.RowField> logicalFields = Stream.of(fields)
+			.map(f -> Preconditions.checkNotNull(f, "Field definition must not be null."))
+			.map(f -> new RowType.RowField(f.name, f.dataType.getLogicalType(), f.description))
+			.collect(Collectors.toList());
+		final Map<String, DataType> fieldDataTypes = Stream.of(fields)
+			.collect(Collectors.toMap(f -> f.name, f -> f.dataType));
+		return new FieldsDataType(new RowType(logicalFields), fieldDataTypes);
+	}
+
+	/**
+	 * Data type for representing untyped {@code NULL} values. A null type has no other value except
+	 * {@code NULL}, thus, it can be cast to any nullable type similar to JVM semantics.
+	 *
+	 * <p>This type helps in representing unknown types in API calls that use a {@code NULL} literal
+	 * as well as bridging to formats such as JSON or Avro that define such a type as well.
+	 *
+	 * <p>The null type is an extension to the SQL standard.
+	 *
+	 * @see NullType
+	 */
+	public static DataType NULL() {
+		return new AtomicDataType(new NullType());
+	}
+
+	/**
+	 * Data type of an arbitrary serialized type. This type is a black box within the table ecosystem
+	 * and is only deserialized at the edges.
+	 *
+	 * <p>The any type is an extension to the SQL standard.
+	 *
+	 * <p>This method assumes that a {@link TypeSerializer} instance is present. Use {@link #ANY(TypeInformation)}
+	 * for generating a serializer from Flink's core type system automatically in subsequent layers.
+	 *
+	 * @param clazz originating value class
+	 * @param serializer type serializer
+	 *
+	 * @see AnyType
+	 */
+	public static <T> DataType ANY(Class<T> clazz, TypeSerializer<T> serializer) {
+		return new AtomicDataType(new AnyType<>(clazz, serializer));
+	}
+
+	/**
+	 * Data type of an arbitrary serialized type backed by {@link TypeInformation}. This type is
+	 * a black box within the table ecosystem and is only deserialized at the edges.
+	 *
+	 * <p>The any type is an extension to the SQL standard.
+	 *
+	 * <p>Compared to an {@link #ANY(Class, TypeSerializer)}, this type does not contain a {@link TypeSerializer}
+	 * yet. The serializer will be generated from the enclosed {@link TypeInformation} but needs access
+	 * to the {@link ExecutionConfig} of the current execution environment. Thus, this type is just a
+	 * placeholder.
+	 *
+	 * @see TypeInformationAnyType
+	 */
+	public static <T> DataType ANY(TypeInformation<T> typeInformation) {
+		return new AtomicDataType(new TypeInformationAnyType<>(typeInformation));
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Helper functions
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Resolution in seconds with 6 digits for fractional seconds by default.
+	 *
+	 * @see #SECOND(int)
+	 */
+	public static Resolution SECOND() {
+		return new Resolution(Resolution.IntervalUnit.SECOND, DayTimeIntervalType.DEFAULT_FRACTIONAL_PRECISION);
+	}
+
+	/**
+	 * Resolution in seconds and (possibly) fractional seconds. The precision is the number of
+	 * digits of fractional seconds. It must have a value between 0 and 9 (both inclusive). If
+	 * no fractional is specified, it is equal to 6 by default.
+	 *
+	 * @see #SECOND()
+	 */
+	public static Resolution SECOND(int precision) {
+		return new Resolution(Resolution.IntervalUnit.SECOND, precision);
+	}
+
+	/**
+	 * Resolution in minutes.
+	 */
+	public static Resolution MINUTE() {
+		return new Resolution(Resolution.IntervalUnit.MINUTE);
+	}
+
+	/**
+	 * Resolution in hours.
+	 */
+	public static Resolution HOUR() {
+		return new Resolution(Resolution.IntervalUnit.HOUR);
+	}
+
+	/**
+	 * Resolution in days. The precision is the number of digits of days. It must have a value
+	 * between 1 and 6 (both inclusive). If no precision is specified, it is equal to 2 by default.
+	 *
+	 * @see #DAY()
+	 */
+	public static Resolution DAY(int precision) {
+		return new Resolution(Resolution.IntervalUnit.DAY, precision);
+	}
+
+	/**
+	 * Resolution in days with 2 digits for the number of days by default.
+	 *
+	 * @see #DAY(int)
+	 */
+	public static Resolution DAY() {
+		return new Resolution(Resolution.IntervalUnit.DAY, DayTimeIntervalType.DEFAULT_DAY_PRECISION);
+	}
+
+	/**
+	 * Resolution in months.
+	 */
+	public static Resolution MONTH() {
+		return new Resolution(Resolution.IntervalUnit.MONTH);
+	}
+
+	/**
+	 * Resolution in years. The precision is the number of digits of years. It must have a value
+	 * between 1 and 4 (both inclusive). If no precision is specified, it is equal to 2.
+	 *
+	 * @see #YEAR()
+	 */
+	public static Resolution YEAR(int precision) {
+		return new Resolution(Resolution.IntervalUnit.YEAR, precision);
+	}
+
+	/**
+	 * Resolution in years with 2 digits for the number of years by default.
+	 *
+	 * @see #YEAR(int)
+	 */
+	public static Resolution YEAR() {
+		return new Resolution(Resolution.IntervalUnit.YEAR, YearMonthIntervalType.DEFAULT_PRECISION);
+	}
+
+	/**
+	 * Field definition with field name and data type.
+	 */
+	public static Field FIELD(String name, DataType dataType) {
+		return new Field(
+			Preconditions.checkNotNull(name, "Field name must not be null."),
+			Preconditions.checkNotNull(dataType, "Field data type must not be null."),
+			null);
+	}
+
+	/**
+	 * Field definition with field name, data type, and a description.
+	 */
+	public static Field FIELD(String name, DataType dataType, String description) {
+		return new Field(
+			Preconditions.checkNotNull(name, "Field name must not be null."),
+			Preconditions.checkNotNull(dataType, "Field data type must not be null."),
+			Preconditions.checkNotNull(description, "Field description must not be null."));
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Helper classes
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Helper class for defining the resolution of an interval.
+	 */
+	public static final class Resolution {
+
+		private static final int EMPTY_PRECISION = -1;
+
+		private enum IntervalUnit {
+			SECOND,
+			MINUTE,
+			HOUR,
+			DAY,
+			MONTH,
+			YEAR
+		}
+
+		private static final Map<List<IntervalUnit>, BiFunction<Integer, Integer, LogicalType>> resolutionMapping = new HashMap<>();
+		static {
+			addResolutionMapping(
+				IntervalUnit.YEAR, null,
+				(p1, p2) -> new YearMonthIntervalType(YearMonthResolution.YEAR, p1));
+			addResolutionMapping(
+				IntervalUnit.MONTH, null,
+				(p1, p2) -> new YearMonthIntervalType(YearMonthResolution.MONTH));
+			addResolutionMapping(
+				IntervalUnit.YEAR, IntervalUnit.MONTH,
+				(p1, p2) -> new YearMonthIntervalType(YearMonthResolution.YEAR_TO_MONTH, p1));
+			addResolutionMapping(
+				IntervalUnit.DAY, null,
+				(p1, p2) -> new DayTimeIntervalType(DayTimeResolution.DAY, p1, DayTimeIntervalType.DEFAULT_FRACTIONAL_PRECISION));
+			addResolutionMapping(
+				IntervalUnit.DAY, IntervalUnit.HOUR,
+				(p1, p2) -> new DayTimeIntervalType(DayTimeResolution.DAY_TO_HOUR, p1, DayTimeIntervalType.DEFAULT_FRACTIONAL_PRECISION));
+			addResolutionMapping(
+				IntervalUnit.DAY, IntervalUnit.MINUTE,
+				(p1, p2) -> new DayTimeIntervalType(DayTimeResolution.DAY_TO_MINUTE, p1, DayTimeIntervalType.DEFAULT_FRACTIONAL_PRECISION));
+			addResolutionMapping(
+				IntervalUnit.DAY, IntervalUnit.SECOND,
+				(p1, p2) -> new DayTimeIntervalType(DayTimeResolution.DAY_TO_SECOND, p1, p2));
+			addResolutionMapping(
+				IntervalUnit.HOUR, null,
+				(p1, p2) -> new DayTimeIntervalType(DayTimeResolution.HOUR));
+			addResolutionMapping(
+				IntervalUnit.HOUR, IntervalUnit.MINUTE,
+				(p1, p2) -> new DayTimeIntervalType(DayTimeResolution.HOUR_TO_MINUTE));
+			addResolutionMapping(
+				IntervalUnit.HOUR, IntervalUnit.SECOND,
+				(p1, p2) -> new DayTimeIntervalType(DayTimeResolution.HOUR_TO_SECOND, DayTimeIntervalType.DEFAULT_DAY_PRECISION, p2));
+			addResolutionMapping(
+				IntervalUnit.MINUTE, null,
+				(p1, p2) -> new DayTimeIntervalType(DayTimeResolution.MINUTE));
+			addResolutionMapping(
+				IntervalUnit.MINUTE, IntervalUnit.SECOND,
+				(p1, p2) -> new DayTimeIntervalType(DayTimeResolution.MINUTE_TO_SECOND, DayTimeIntervalType.DEFAULT_DAY_PRECISION, p2));
+			addResolutionMapping(
+				IntervalUnit.SECOND, null,
+				(p1, p2) -> new DayTimeIntervalType(DayTimeResolution.SECOND, DayTimeIntervalType.DEFAULT_DAY_PRECISION, p1));
+		}
+
+		private static void addResolutionMapping(
+				IntervalUnit leftUnit,
+				IntervalUnit rightUnit,
+				BiFunction<Integer, Integer, LogicalType> typeProvider) {
+			resolutionMapping.put(Arrays.asList(leftUnit, rightUnit), typeProvider);
+		}
+
+		private static LogicalType resolveInterval(Resolution fromResolution, @Nullable Resolution toResolution) {
+			final IntervalUnit toBoundary = toResolution == null ? null : toResolution.unit;
+			final int toPrecision = toResolution == null ? EMPTY_PRECISION : toResolution.precision;
+
+			final BiFunction<Integer, Integer, LogicalType> typeProvider = resolutionMapping.get(
+				Arrays.asList(fromResolution.unit, toBoundary));
+			if (typeProvider == null) {
+				throw new ValidationException(String.format("Unsupported interval definition '%s TO %s'. " +
+					"Please check the documentation for supported combinations for year-month and day-time intervals.",
+					fromResolution.unit,
+					toBoundary));
+			}
+			return typeProvider.apply(
+				fromResolution.precision,
+				toPrecision);
+		}
+
+		private final int precision;
+
+		private final IntervalUnit unit;
+
+		private Resolution(IntervalUnit unit, int precision) {
+			this.unit = unit;
+			this.precision = precision;
+		}
+
+		private Resolution(IntervalUnit unit) {
+			this(unit, EMPTY_PRECISION);
+		}
+
+		@Override
+		public String toString() {
+			if (precision != EMPTY_PRECISION) {
+				return String.format("%s(%d)", unit, precision);
+			}
+			return unit.toString();
+		}
+	}
+
+	/**
+	 * Helper class for defining the field of a row or structured type.
+	 */
+	public static final class Field {
+
+		private final String name;
+
+		private final DataType dataType;
+
+		private final @Nullable String description;
+
+		private Field(String name, DataType dataType, String description) {
+			this.name = name;
+			this.dataType = dataType;
+			this.description = description;
+		}
+
+		public String getName() {
+			return name;
+		}
+
+		public DataType getDataType() {
+			return dataType;
+		}
+
+		public Optional<String> getDescription() {
+			return Optional.ofNullable(description);
+		}
+	}
+
+	private DataTypes() {
+		// no instances
+	}
+
+	// CHECKSTYLE.ON: MethodName
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/AtomicDataType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/AtomicDataType.java
new file mode 100644
index 0000000..aea5d6b
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/AtomicDataType.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+/**
+ * A data type that does not contain further data types (e.g. {@code INT} or {@code BOOLEAN}).
+ *
+ * @see DataTypes for a list of supported data types
+ */
+@PublicEvolving
+public final class AtomicDataType extends DataType {
+
+	public AtomicDataType(LogicalType logicalType, @Nullable Class<?> conversionClass) {
+		super(logicalType, conversionClass);
+	}
+
+	public AtomicDataType(LogicalType logicalType) {
+		super(logicalType, null);
+	}
+
+	@Override
+	public DataType notNull() {
+		return new AtomicDataType(
+			logicalType.copy(false),
+			conversionClass);
+	}
+
+	@Override
+	public DataType nullable() {
+		return new AtomicDataType(
+			logicalType.copy(true),
+			conversionClass);
+	}
+
+	@Override
+	public DataType bridgedTo(Class<?> newConversionClass) {
+		return new AtomicDataType(
+			logicalType,
+			Preconditions.checkNotNull(newConversionClass, "New conversion class must not be null."));
+	}
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java
new file mode 100644
index 0000000..2865296
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Array;
+import java.util.Objects;
+
+/**
+ * A data type that contains an element type (e.g. {@code ARRAY} or {@code MULTISET}).
+ *
+ * @see DataTypes for a list of supported data types
+ */
+@PublicEvolving
+public final class CollectionDataType extends DataType {
+
+	private final DataType elementDataType;
+
+	public CollectionDataType(
+			LogicalType logicalType,
+			@Nullable Class<?> conversionClass,
+			DataType elementDataType) {
+		super(logicalType, conversionClass);
+		this.elementDataType = Preconditions.checkNotNull(elementDataType, "Element data type must not be null.");
+	}
+
+	public CollectionDataType(
+			LogicalType logicalType,
+			DataType elementDataType) {
+		this(logicalType, null, elementDataType);
+	}
+
+	public DataType getElementDataType() {
+		return elementDataType;
+	}
+
+	@Override
+	public DataType notNull() {
+		return new CollectionDataType(
+			logicalType.copy(false),
+			conversionClass,
+			elementDataType);
+	}
+
+	@Override
+	public DataType nullable() {
+		return new CollectionDataType(
+			logicalType.copy(true),
+			conversionClass,
+			elementDataType);
+	}
+
+	@Override
+	public DataType bridgedTo(Class<?> newConversionClass) {
+		return new CollectionDataType(
+			logicalType,
+			Preconditions.checkNotNull(newConversionClass, "New conversion class must not be null."),
+			elementDataType);
+	}
+
+	@Override
+	public Class<?> getConversionClass() {
+		// arrays are a special case because their default conversion class depends on the
+		// conversion class of the element type
+		if (logicalType.getTypeRoot() == LogicalTypeRoot.ARRAY && conversionClass == null) {
+			return Array.newInstance(elementDataType.getConversionClass(), 0).getClass();
+		}
+		return super.getConversionClass();
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		if (!super.equals(o)) {
+			return false;
+		}
+		CollectionDataType that = (CollectionDataType) o;
+		return elementDataType.equals(that.elementDataType);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(super.hashCode(), elementDataType);
+	}
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java
new file mode 100644
index 0000000..1ba654e
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Describes the data type of a value in the table ecosystem. Instances of this class can be used to
+ * declare input and/or output types of operations.
+ *
+ * <p>The {@link DataType} class has two responsibilities: declaring a logical type and giving hints
+ * about the physical representation of data to the optimizer. While the logical type is mandatory,
+ * hints are optional but useful at the edges to other APIs.
+ *
+ * <p>The logical type is independent of any physical representation and is close to the "data type"
+ * terminology of the SQL standard. See {@link org.apache.flink.table.types.logical.LogicalType} and
+ * its subclasses for more information about available logical types and their properties.
+ *
+ * <p>Physical hints are required at the edges of the table ecosystem. Hints indicate the data format
+ * that an implementation expects. For example, a data source could express that it produces values for
+ * logical timestamps using a {@link java.sql.Timestamp} class instead of using {@link java.time.LocalDateTime}.
+ * With this information, the runtime is able to convert the produced class into its internal data
+ * format. In return, a data sink can declare the data format it consumes from the runtime.
+ *
+ * @see DataTypes for a list of supported data types and instances of this class.
+ */
+@PublicEvolving
+public abstract class DataType implements Serializable {
+
+	protected LogicalType logicalType;
+
+	protected @Nullable Class<?> conversionClass;
+
+	DataType(LogicalType logicalType, @Nullable Class<?> conversionClass) {
+		this.logicalType = Preconditions.checkNotNull(logicalType, "Logical type must not be null.");
+		this.conversionClass = performEarlyClassValidation(logicalType, conversionClass);
+	}
+
+	/**
+	 * Returns the corresponding logical type.
+	 *
+	 * @return a parameterized instance of {@link LogicalType}
+	 */
+	public LogicalType getLogicalType() {
+		return logicalType;
+	}
+
+	/**
+	 * Returns the corresponding conversion class for representing values. If no conversion class was
+	 * defined manually, the default conversion defined by the logical type is used.
+	 *
+	 * @see LogicalType#getDefaultConversion()
+	 *
+	 * @return the expected conversion class
+	 */
+	public Class<?> getConversionClass() {
+		if (conversionClass == null) {
+			return logicalType.getDefaultConversion();
+		}
+		return conversionClass;
+	}
+
+	/**
+	 * Adds a hint that null values are not expected in the data for this type.
+	 *
+	 * @return a new, reconfigured data type instance
+	 */
+	public abstract DataType notNull();
+
+	/**
+	 * Adds a hint that null values are expected in the data for this type (default behavior).
+	 *
+	 * <p>This method exists for explicit declaration of the default behavior or for invalidation of
+	 * a previous call to {@link #notNull()}.
+	 *
+	 * @return a new, reconfigured data type instance
+	 */
+	public abstract DataType nullable();
+
+	/**
+	 * Adds a hint that data should be represented using the given class when entering or leaving
+	 * the table ecosystem.
+	 *
+	 * <p>A supported conversion class depends on the logical type and its nullability property.
+	 *
+	 * <p>Please see the implementation of {@link LogicalType#supportsInputConversion(Class)},
+	 * {@link LogicalType#supportsOutputConversion(Class)}, or the documentation for more information
+	 * about supported conversions.
+	 *
+	 * @return a new, reconfigured data type instance
+	 */
+	public abstract DataType bridgedTo(Class<?> newConversionClass);
+
+	@Override
+	public String toString() {
+		return logicalType.toString();
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		DataType dataType = (DataType) o;
+		return logicalType.equals(dataType.logicalType) &&
+			Objects.equals(conversionClass, dataType.conversionClass);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(logicalType, conversionClass);
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * This method should catch the most common errors. However, another validation is required in
+	 * deeper layers as we don't know whether the data type is used for input or output declaration.
+	 */
+	private static <C> Class<C> performEarlyClassValidation(
+			LogicalType logicalType,
+			Class<C> candidate) {
+
+		if (candidate != null &&
+				!logicalType.supportsInputConversion(candidate) &&
+				!logicalType.supportsOutputConversion(candidate)) {
+			throw new ValidationException(
+				String.format(
+					"Logical type '%s' does not support a conversion from or to class '%s'.",
+					logicalType.asSummaryString(),
+					candidate.getName()));
+		}
+		return candidate;
+	}
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/FieldsDataType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/FieldsDataType.java
new file mode 100644
index 0000000..c08fe58
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/FieldsDataType.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A data type that contains field data types (e.g. {@code ROW} or structured types).
+ *
+ * @see DataTypes for a list of supported data types
+ */
+@PublicEvolving
+public final class FieldsDataType extends DataType {
+
+	private final Map<String, DataType> fieldDataTypes;
+
+	public FieldsDataType(
+			LogicalType logicalType,
+			@Nullable Class<?> conversionClass,
+			Map<String, DataType> fieldDataTypes) {
+		super(logicalType, conversionClass);
+		this.fieldDataTypes = Collections.unmodifiableMap(
+			new HashMap<>(
+				Preconditions.checkNotNull(fieldDataTypes, "Field data types must not be null.")));
+	}
+
+	public FieldsDataType(
+			LogicalType logicalType,
+			Map<String, DataType> fieldDataTypes) {
+		this(logicalType, null, fieldDataTypes);
+	}
+
+	public Map<String, DataType> getFieldDataTypes() {
+		return fieldDataTypes;
+	}
+
+	@Override
+	public DataType notNull() {
+		return new FieldsDataType(
+			logicalType.copy(false),
+			conversionClass,
+			fieldDataTypes);
+	}
+
+	@Override
+	public DataType nullable() {
+		return new FieldsDataType(
+			logicalType.copy(true),
+			conversionClass,
+			fieldDataTypes);
+	}
+
+	@Override
+	public DataType bridgedTo(Class<?> newConversionClass) {
+		return new FieldsDataType(
+			logicalType,
+			Preconditions.checkNotNull(newConversionClass, "New conversion class must not be null."),
+			fieldDataTypes);
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		if (!super.equals(o)) {
+			return false;
+		}
+		FieldsDataType that = (FieldsDataType) o;
+		return fieldDataTypes.equals(that.fieldDataTypes);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(super.hashCode(), fieldDataTypes);
+	}
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/KeyValueDataType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/KeyValueDataType.java
new file mode 100644
index 0000000..0b345fd
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/KeyValueDataType.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
+/**
+ * A data type that contains a key and value data type (e.g. {@code MAP}).
+ *
+ * @see DataTypes for a list of supported data types
+ */
+@PublicEvolving
+public final class KeyValueDataType extends DataType {
+
+	private final DataType keyDataType;
+
+	private final DataType valueDataType;
+
+	public KeyValueDataType(
+			LogicalType logicalType,
+			@Nullable Class<?> conversionClass,
+			DataType keyDataType,
+			DataType valueDataType) {
+		super(logicalType, conversionClass);
+		this.keyDataType = Preconditions.checkNotNull(keyDataType, "Key data type must not be null.");
+		this.valueDataType = Preconditions.checkNotNull(valueDataType, "Value data type must not be null.");
+	}
+
+	public KeyValueDataType(
+			LogicalType logicalType,
+			DataType keyDataType,
+			DataType valueDataType) {
+		this(logicalType, null, keyDataType, valueDataType);
+	}
+
+	public DataType getKeyDataType() {
+		return keyDataType;
+	}
+
+	public DataType getValueDataType() {
+		return valueDataType;
+	}
+
+	@Override
+	public DataType notNull() {
+		return new KeyValueDataType(
+			logicalType.copy(false),
+			conversionClass,
+			keyDataType,
+			valueDataType);
+	}
+
+	@Override
+	public DataType nullable() {
+		return new KeyValueDataType(
+			logicalType.copy(true),
+			conversionClass,
+			keyDataType,
+			valueDataType);
+	}
+
+	@Override
+	public DataType bridgedTo(Class<?> newConversionClass) {
+		return new KeyValueDataType(
+			logicalType,
+			Preconditions.checkNotNull(newConversionClass, "New conversion class must not be null."),
+			keyDataType,
+			valueDataType);
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		if (!super.equals(o)) {
+			return false;
+		}
+		KeyValueDataType that = (KeyValueDataType) o;
+		return keyDataType.equals(that.keyDataType) && valueDataType.equals(that.valueDataType);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(super.hashCode(), keyDataType, valueDataType);
+	}
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java
new file mode 100644
index 0000000..2475db8
--- /dev/null
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.table.api.ValidationException;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.table.api.DataTypes.ARRAY;
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.CHAR;
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.INTERVAL;
+import static org.apache.flink.table.api.DataTypes.MONTH;
+import static org.apache.flink.table.api.DataTypes.MULTISET;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+import static org.apache.flink.table.api.DataTypes.YEAR;
+import static org.apache.flink.table.types.TypeTestingUtils.hasConversionClass;
+import static org.apache.flink.table.types.TypeTestingUtils.hasNullability;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Test for {@link DataType}.
+ */
+public class DataTypeTest {
+
+	@Test
+	public void testNullability() {
+		assertThat(
+			BIGINT().nullable(),
+			hasNullability(true));
+
+		assertThat(
+			BIGINT().notNull(),
+			hasNullability(false));
+
+		assertThat(
+			BIGINT().notNull().nullable(),
+			hasNullability(true));
+	}
+
+	@Test
+	public void testAtomicConversion() {
+		assertThat(
+			TIMESTAMP(0).bridgedTo(java.sql.Timestamp.class),
+			hasConversionClass(java.sql.Timestamp.class));
+	}
+
+	@Test
+	public void testTolerantAtomicConversion() {
+		// this is logically only supported as input type because of
+		// nullability but is tolerated until the planner complains
+		// about an output type
+		assertThat(
+			BIGINT().nullable().bridgedTo(long.class),
+			hasConversionClass(long.class));
+	}
+
+	@Test(expected = ValidationException.class)
+	public void testInvalidAtomicConversion() {
+		TIMESTAMP(0).bridgedTo(DataTypesTest.class);
+	}
+
+	@Test
+	public void testArrayElementConversion() {
+		assertThat(
+			ARRAY(ARRAY(INT().notNull().bridgedTo(int.class))),
+			hasConversionClass(int[][].class));
+	}
+
+	@Test
+	public void testTolerantArrayConversion() {
+		// this is logically only supported as input type because of
+		// nullability but is tolerated until the planner complains
+		// about an output type
+		assertThat(
+			ARRAY(ARRAY(INT().nullable())).bridgedTo(int[][].class),
+			hasConversionClass(int[][].class));
+	}
+
+	@Test(expected = ValidationException.class)
+	public void testInvalidArrayConversion() {
+		ARRAY(ARRAY(INT())).bridgedTo(int[][][].class);
+	}
+
+	@Test
+	public void testTolerantMapConversion() {
+		// this doesn't make much sense logically but is supported until the planner complains
+		assertThat(
+			MULTISET(MULTISET(INT().bridgedTo(int.class))),
+			hasConversionClass(Map.class));
+	}
+
+	@Test
+	public void testFields() {
+		final DataType rowDataType = ROW(FIELD("field1", CHAR(2)), FIELD("field2", BOOLEAN()));
+
+		final Map<String, DataType> fields = new HashMap<>();
+		fields.put("field1", CHAR(2));
+		fields.put("field2", BOOLEAN());
+		assertEquals(fields, ((FieldsDataType) rowDataType).getFieldDataTypes());
+	}
+
+	@Test(expected = ValidationException.class)
+	public void testInvalidOrderInterval() {
+		INTERVAL(MONTH(), YEAR(2));
+	}
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java
new file mode 100644
index 0000000..a1f4289
--- /dev/null
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.base.VoidSerializer;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.logical.AnyType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.TypeInformationAnyType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType.YearMonthResolution;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.api.DataTypes.ANY;
+import static org.apache.flink.table.api.DataTypes.ARRAY;
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.BINARY;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.BYTES;
+import static org.apache.flink.table.api.DataTypes.CHAR;
+import static org.apache.flink.table.api.DataTypes.DATE;
+import static org.apache.flink.table.api.DataTypes.DECIMAL;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.FLOAT;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.INTERVAL;
+import static org.apache.flink.table.api.DataTypes.MAP;
+import static org.apache.flink.table.api.DataTypes.MINUTE;
+import static org.apache.flink.table.api.DataTypes.MONTH;
+import static org.apache.flink.table.api.DataTypes.MULTISET;
+import static org.apache.flink.table.api.DataTypes.NULL;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.SECOND;
+import static org.apache.flink.table.api.DataTypes.SMALLINT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TIME;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_TIME_ZONE;
+import static org.apache.flink.table.api.DataTypes.TINYINT;
+import static org.apache.flink.table.api.DataTypes.VARBINARY;
+import static org.apache.flink.table.api.DataTypes.VARCHAR;
+import static org.apache.flink.table.types.TypeTestingUtils.hasConversionClass;
+import static org.apache.flink.table.types.TypeTestingUtils.hasLogicalType;
+import static org.apache.flink.table.types.logical.DayTimeIntervalType.DEFAULT_DAY_PRECISION;
+import static org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution.MINUTE_TO_SECOND;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link DataTypes}.
+ */
+@RunWith(Parameterized.class)
+public class DataTypesTest {
+
+	@Parameters(name = "{index}: {0}=[Logical: {1}, Class: {2}]")
+	public static List<Object[]> dataTypes() {
+		return Arrays.asList(
+			new Object[][]{
+				{CHAR(2), new CharType(2), String.class},
+
+				{VARCHAR(2), new VarCharType(2), String.class},
+
+				{STRING(), new VarCharType(VarCharType.MAX_LENGTH), String.class},
+
+				{BOOLEAN(), new BooleanType(), Boolean.class},
+
+				{BINARY(42), new BinaryType(42), byte[].class},
+
+				{VARBINARY(42), new VarBinaryType(42), byte[].class},
+
+				{BYTES(), new VarBinaryType(VarBinaryType.MAX_LENGTH), byte[].class},
+
+				{DECIMAL(10, 10), new DecimalType(10, 10), BigDecimal.class},
+
+				{TINYINT(), new TinyIntType(), Byte.class},
+
+				{SMALLINT(), new SmallIntType(), Short.class},
+
+				{INT(), new IntType(), Integer.class},
+
+				{BIGINT(), new BigIntType(), Long.class},
+
+				{FLOAT(), new FloatType(), Float.class},
+
+				{DOUBLE(), new DoubleType(), Double.class},
+
+				{DATE(), new DateType(), java.time.LocalDate.class},
+
+				{TIME(3), new TimeType(3), java.time.LocalTime.class},
+
+				{TIMESTAMP(3), new TimestampType(3), java.time.LocalDateTime.class},
+
+				{TIMESTAMP_WITH_TIME_ZONE(3),
+					new ZonedTimestampType(3),
+					java.time.OffsetDateTime.class},
+
+				{TIMESTAMP_WITH_LOCAL_TIME_ZONE(3),
+					new LocalZonedTimestampType(3),
+					java.time.Instant.class},
+
+				{INTERVAL(MINUTE(), SECOND(3)),
+					new DayTimeIntervalType(MINUTE_TO_SECOND, DEFAULT_DAY_PRECISION, 3),
+					java.time.Duration.class},
+
+				{INTERVAL(MONTH()),
+					new YearMonthIntervalType(YearMonthResolution.MONTH),
+					java.time.Period.class},
+
+				{ARRAY(ARRAY(INT())),
+					new ArrayType(new ArrayType(new IntType())),
+					Integer[][].class},
+
+				{MULTISET(MULTISET(INT())),
+					new MultisetType(new MultisetType(new IntType())),
+					Map.class},
+
+				{MAP(INT(), SMALLINT()),
+					new MapType(new IntType(), new SmallIntType()),
+					Map.class},
+
+				{ROW(FIELD("field1", CHAR(2)), FIELD("field2", BOOLEAN())),
+					new RowType(
+						Arrays.asList(
+							new RowType.RowField("field1", new CharType(2)),
+							new RowType.RowField("field2", new BooleanType()))),
+					Row.class},
+
+				{NULL(), new NullType(), Object.class},
+
+				{ANY(Types.GENERIC(DataTypesTest.class)),
+					new TypeInformationAnyType<>(Types.GENERIC(DataTypesTest.class)),
+					DataTypesTest.class},
+
+				{ANY(Void.class, VoidSerializer.INSTANCE),
+					new AnyType<>(Void.class, VoidSerializer.INSTANCE),
+					Void.class}
+			}
+		);
+	}
+
+	@Parameter
+	public DataType dataType;
+
+	@Parameter(1)
+	public LogicalType expectedLogicalType;
+
+	@Parameter(2)
+	public Class<?> expectedConversionClass;
+
+	@Test
+	public void testLogicalType() {
+		assertThat(dataType, hasLogicalType(expectedLogicalType));
+	}
+
+	@Test
+	public void testConversionClass() {
+		assertThat(dataType, hasConversionClass(expectedConversionClass));
+	}
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/TypeTestingUtils.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/TypeTestingUtils.java
new file mode 100644
index 0000000..16fb4d0
--- /dev/null
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/TypeTestingUtils.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.table.types.logical.LogicalType;
+
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.FeatureMatcher;
+import org.hamcrest.Matcher;
+
+/**
+ * Utilities for testing types.
+ */
+public class TypeTestingUtils {
+
+	public static Matcher<DataType> hasLogicalType(LogicalType logicalType) {
+		return new FeatureMatcher<DataType, LogicalType>(
+				CoreMatchers.equalTo(logicalType),
+				"logical type of the data type",
+				"logical type") {
+
+			@Override
+			protected LogicalType featureValueOf(DataType actual) {
+				return actual.getLogicalType();
+			}
+		};
+	}
+
+	public static Matcher<DataType> hasConversionClass(Class<?> clazz) {
+		return new FeatureMatcher<DataType, Class<?>>(
+				CoreMatchers.equalTo(clazz),
+				"conversion class of the data type",
+				"conversion class") {
+
+			@Override
+			protected Class<?> featureValueOf(DataType actual) {
+				return actual.getConversionClass();
+			}
+		};
+	}
+
+	public static Matcher<DataType> hasNullability(boolean isNullable) {
+		return new FeatureMatcher<DataType, Boolean>(
+				CoreMatchers.equalTo(isNullable),
+				"nullability of the data type",
+				"nullability") {
+
+			@Override
+			protected Boolean featureValueOf(DataType actual) {
+				return actual.getLogicalType().isNullable();
+			}
+		};
+	}
+}


[flink] 06/06: [hotfix][table-common] Remove COLLECTION family from MAP type root

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bb13989d30f2b87565572c21f173255953d1bac2
Author: Timo Walther <tw...@apache.org>
AuthorDate: Thu May 9 09:36:45 2019 +0200

    [hotfix][table-common] Remove COLLECTION family from MAP type root
---
 .../main/java/org/apache/flink/table/types/logical/LogicalTypeRoot.java  | 1 -
 1 file changed, 1 deletion(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeRoot.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeRoot.java
index d17443b..322ae93 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeRoot.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeRoot.java
@@ -137,7 +137,6 @@ public enum LogicalTypeRoot {
 
 	MAP(
 		LogicalTypeFamily.CONSTRUCTED,
-		LogicalTypeFamily.COLLECTION,
 		LogicalTypeFamily.EXTENSION),
 
 	ROW(