You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/07/18 15:37:04 UTC

[flink] branch release-1.9 updated (66edf7d -> b292210)

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 66edf7d  [FLINK-13315][api-java] Port wmstrategies to api-java-bridge
     new 3c443a0  [hotfix][table-common] Fix minor typos in ObjectIdentifier
     new 15ef025  [hotfix][table-common] Update list of synonyms for logical types
     new d05b436  [hotfix][table-common] Link timestamp precisions
     new e750efc  [FLINK-13078][table-common] Simplify serializable string representation for parsers
     new e5d4ec0  [FLINK-13078][table-common] Add an unresolved user-defined logical type
     new b292210  [FLINK-13078][table-common] Add a logical type parser

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/table/catalog/ObjectIdentifier.java      |   8 +-
 .../apache/flink/table/types/logical/AnyType.java  |   4 +-
 .../flink/table/types/logical/DecimalType.java     |   3 +-
 .../flink/table/types/logical/DoubleType.java      |   3 +-
 .../apache/flink/table/types/logical/IntType.java  |   2 +-
 .../types/logical/LocalZonedTimestampType.java     |   6 +-
 .../flink/table/types/logical/LogicalType.java     |   3 +
 .../flink/table/types/logical/LogicalTypeRoot.java |   3 +
 .../table/types/logical/LogicalTypeVisitor.java    |   4 +-
 .../apache/flink/table/types/logical/RowType.java  |   2 +-
 .../flink/table/types/logical/SymbolType.java      |   2 +-
 .../apache/flink/table/types/logical/TimeType.java |   3 +-
 .../flink/table/types/logical/TimestampType.java   |   3 +-
 .../types/logical/TypeInformationAnyType.java      |   2 +-
 .../types/logical/UnresolvedUserDefinedType.java   | 153 ++++
 .../table/types/logical/ZonedTimestampType.java    |   6 +-
 .../types/logical/utils/LogicalTypeParser.java     | 900 +++++++++++++++++++++
 .../apache/flink/table/utils/EncodingUtils.java    |   6 +-
 .../apache/flink/table/utils/TypeStringUtils.java  |  12 +-
 .../flink/table/types/LogicalTypeParserTest.java   | 519 ++++++++++++
 .../apache/flink/table/types/LogicalTypesTest.java |  23 +-
 21 files changed, 1636 insertions(+), 31 deletions(-)
 create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/UnresolvedUserDefinedType.java
 create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeParser.java
 create mode 100644 flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeParserTest.java


[flink] 03/06: [hotfix][table-common] Link timestamp precisions

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d05b436024888e554033a64839056f1b4dfbfcb6
Author: Timo Walther <tw...@apache.org>
AuthorDate: Wed Jul 10 08:44:08 2019 +0200

    [hotfix][table-common] Link timestamp precisions
---
 .../apache/flink/table/types/logical/LocalZonedTimestampType.java   | 6 +++---
 .../org/apache/flink/table/types/logical/ZonedTimestampType.java    | 6 +++---
 2 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LocalZonedTimestampType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LocalZonedTimestampType.java
index d0c237c..90a8ed9 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LocalZonedTimestampType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LocalZonedTimestampType.java
@@ -53,11 +53,11 @@ import java.util.Set;
 @PublicEvolving
 public final class LocalZonedTimestampType extends LogicalType {
 
-	public static final int MIN_PRECISION = 0;
+	public static final int MIN_PRECISION = TimestampType.MIN_PRECISION;
 
-	public static final int MAX_PRECISION = 9;
+	public static final int MAX_PRECISION = TimestampType.MAX_PRECISION;
 
-	public static final int DEFAULT_PRECISION = 6;
+	public static final int DEFAULT_PRECISION = TimestampType.DEFAULT_PRECISION;
 
 	private static final String FORMAT = "TIMESTAMP(%d) WITH LOCAL TIME ZONE";
 
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/ZonedTimestampType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/ZonedTimestampType.java
index 22e5538..d6a4464 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/ZonedTimestampType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/ZonedTimestampType.java
@@ -49,11 +49,11 @@ import java.util.Set;
 @PublicEvolving
 public final class ZonedTimestampType extends LogicalType {
 
-	public static final int MIN_PRECISION = 0;
+	public static final int MIN_PRECISION = TimestampType.MIN_PRECISION;
 
-	public static final int MAX_PRECISION = 9;
+	public static final int MAX_PRECISION = TimestampType.MAX_PRECISION;
 
-	public static final int DEFAULT_PRECISION = 6;
+	public static final int DEFAULT_PRECISION = TimestampType.DEFAULT_PRECISION;
 
 	private static final String FORMAT = "TIMESTAMP(%d) WITH TIME ZONE";
 


[flink] 02/06: [hotfix][table-common] Update list of synonyms for logical types

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 15ef025bd387716b1a2259ea238bd1222623af76
Author: Timo Walther <tw...@apache.org>
AuthorDate: Wed Jul 10 08:43:18 2019 +0200

    [hotfix][table-common] Update list of synonyms for logical types
---
 .../main/java/org/apache/flink/table/types/logical/DecimalType.java    | 3 ++-
 .../src/main/java/org/apache/flink/table/types/logical/DoubleType.java | 3 ++-
 .../src/main/java/org/apache/flink/table/types/logical/IntType.java    | 2 +-
 .../src/main/java/org/apache/flink/table/types/logical/RowType.java    | 2 +-
 .../src/main/java/org/apache/flink/table/types/logical/TimeType.java   | 3 ++-
 .../main/java/org/apache/flink/table/types/logical/TimestampType.java  | 3 ++-
 6 files changed, 10 insertions(+), 6 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DecimalType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DecimalType.java
index c11ede0..e91793d 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DecimalType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DecimalType.java
@@ -34,7 +34,8 @@ import java.util.Set;
  * digits in a number (=precision) and {@code s} is the number of digits to the right of the decimal
  * point in a number (=scale). {@code p} must have a value between 1 and 38 (both inclusive). {@code s}
  * must have a value between 0 and {@code p} (both inclusive). The default value for {@code p} is 10.
- * The default value for {@code s} is 0.
+ * The default value for {@code s} is 0. {@code NUMERIC(p, s)} and {@code DEC(p, s)} are synonyms for
+ * this type.
  */
 @PublicEvolving
 public final class DecimalType extends LogicalType {
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DoubleType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DoubleType.java
index 90d1429..ea4f5b4 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DoubleType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DoubleType.java
@@ -27,7 +27,8 @@ import java.util.Set;
 /**
  * Logical type of an 8-byte double precision floating point number.
  *
- * <p>The serialized string representation is {@code DOUBLE}.
+ * <p>The serialized string representation is {@code DOUBLE}. {@code DOUBLE PRECISION} is a synonym
+ * for this type.
  */
 @PublicEvolving
 public final class DoubleType extends LogicalType {
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/IntType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/IntType.java
index 567d8ef..f362b36 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/IntType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/IntType.java
@@ -27,7 +27,7 @@ import java.util.Set;
 /**
  * Logical type of a 4-byte signed integer with values from -2,147,483,648 to 2,147,483,647.
  *
- * <p>The serialized string representation is {@code INT}.
+ * <p>The serialized string representation is {@code INT}. {@code INTEGER} is a synonym for this type.
  */
 @PublicEvolving
 public final class IntType extends LogicalType {
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/RowType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/RowType.java
index 5a95e7b..c3a5cd1 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/RowType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/RowType.java
@@ -47,7 +47,7 @@ import static org.apache.flink.table.utils.EncodingUtils.escapeSingleQuotes;
  *
  * <p>The serialized string representation is {@code ROW<n0 t0 'd0', n1 t1 'd1', ...>} where
  * {@code n} is the unique name of a field, {@code t} is the logical type of a field, {@code d} is
- * the description of a field.
+ * the description of a field. {@code ROW(...)} is a synonym for being closer to the SQL standard.
  */
 @PublicEvolving
 public final class RowType extends LogicalType {
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TimeType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TimeType.java
index 15763d3..239eed5 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TimeType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TimeType.java
@@ -35,7 +35,8 @@ import java.util.Set;
  *
  * <p>The serialized string representation is {@code TIME(p)} where {@code p} is the number of digits
  * of fractional seconds (=precision). {@code p} must have a value between 0 and 9 (both inclusive).
- * If no precision is specified, {@code p} is equal to 0.
+ * If no precision is specified, {@code p} is equal to 0. {@code TIME(p) WITHOUT TIME ZONE} is a synonym
+ * for this type.
  *
  * <p>A conversion from and to {@code int} describes the number of milliseconds of the day. A
  * conversion from and to {@code long} describes the number of nanoseconds of the day.
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TimestampType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TimestampType.java
index c796f7d..87f062d 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TimestampType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TimestampType.java
@@ -35,7 +35,8 @@ import java.util.Set;
  *
  * <p>The serialized string representation is {@code TIMESTAMP(p)} where {@code p} is the number of
  * digits of fractional seconds (=precision). {@code p} must have a value between 0 and 9 (both inclusive).
- * If no precision is specified, {@code p} is equal to 6.
+ * If no precision is specified, {@code p} is equal to 6. {@code TIMESTAMP(p) WITHOUT TIME ZONE} is a
+ * synonym for this type.
  *
  * <p>A conversion from and to {@code long} is not supported as this would imply a time zone. However,
  * this type is time zone free. For more {@link java.time.Instant}-like semantics use


[flink] 05/06: [FLINK-13078][table-common] Add an unresolved user-defined logical type

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e5d4ec09bfa635590b1a5d506a3b6dee714cbc62
Author: Timo Walther <tw...@apache.org>
AuthorDate: Wed Jul 10 08:51:22 2019 +0200

    [FLINK-13078][table-common] Add an unresolved user-defined logical type
---
 .../flink/table/types/logical/LogicalTypeRoot.java |   3 +
 .../table/types/logical/LogicalTypeVisitor.java    |   4 +-
 .../types/logical/UnresolvedUserDefinedType.java   | 153 +++++++++++++++++++++
 .../apache/flink/table/types/LogicalTypesTest.java |  11 ++
 4 files changed, 169 insertions(+), 2 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeRoot.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeRoot.java
index 4725876..6ff360a 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeRoot.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeRoot.java
@@ -156,6 +156,9 @@ public enum LogicalTypeRoot {
 		LogicalTypeFamily.EXTENSION),
 
 	SYMBOL(
+		LogicalTypeFamily.EXTENSION),
+
+	UNRESOLVED(
 		LogicalTypeFamily.EXTENSION);
 
 	private final Set<LogicalTypeFamily> families;
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java
index 5bf5858..810174f 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java
@@ -24,8 +24,8 @@ import org.apache.flink.annotation.PublicEvolving;
  * The visitor definition of {@link LogicalType}. The visitor transforms a logical type into
  * instances of {@code R}.
  *
- * <p>Incomplete types such as the {@link TypeInformationAnyType} are visited through the generic
- * {@link #visit(LogicalType)}.
+ * <p>Incomplete types such as the {@link TypeInformationAnyType} or {@link UnresolvedUserDefinedType} are visited
+ * through the generic {@link #visit(LogicalType)}.
  *
  * @param <R> result type
  */
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/UnresolvedUserDefinedType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/UnresolvedUserDefinedType.java
new file mode 100644
index 0000000..2168c8e
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/UnresolvedUserDefinedType.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.utils.EncodingUtils;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Placeholder type of an unresolved user-defined type that is identified by a partially or fully
+ * qualified path ({@code [catalog].[database].[type]}).
+ *
+ * <p>It assumes that a type has been registered in a catalog and just needs to be resolved to a
+ * {@link DistinctType} or {@link StructuredType}.
+ *
+ * <p>Two unresolved types are considered equal if they share the same path in a stable session context.
+ *
+ * @see UserDefinedType
+ */
+@PublicEvolving
+public final class UnresolvedUserDefinedType extends LogicalType {
+
+	private final @Nullable String catalog;
+
+	private final @Nullable String database;
+
+	private final String typeIdentifier;
+
+	public UnresolvedUserDefinedType(
+			boolean isNullable,
+			@Nullable String catalog,
+			@Nullable String database,
+			String typeIdentifier) {
+		super(isNullable, LogicalTypeRoot.UNRESOLVED);
+		this.catalog = catalog;
+		this.database = database;
+		this.typeIdentifier = Preconditions.checkNotNull(
+			typeIdentifier,
+			"Type identifier must not be null.");
+	}
+
+	public UnresolvedUserDefinedType(
+			@Nullable String catalog,
+			@Nullable String database,
+			String typeIdentifier) {
+		this(true, catalog, database, typeIdentifier);
+	}
+
+	public Optional<String> getCatalog() {
+		return Optional.ofNullable(catalog);
+	}
+
+	public Optional<String> getDatabase() {
+		return Optional.ofNullable(database);
+	}
+
+	public String getTypeIdentifier() {
+		return typeIdentifier;
+	}
+
+	@Override
+	public LogicalType copy(boolean isNullable) {
+		return new UnresolvedUserDefinedType(isNullable, catalog, database, typeIdentifier);
+	}
+
+	@Override
+	public String asSummaryString() {
+		final String path = Stream.of(catalog, database, typeIdentifier)
+			.filter(Objects::nonNull)
+			.map(EncodingUtils::escapeIdentifier)
+			.collect(Collectors.joining("."));
+		return withNullability(path);
+	}
+
+	@Override
+	public String asSerializableString() {
+		throw new TableException(
+			"An unresolved user-defined type has no serializable string representation. It " +
+				"needs to be resolved into a proper user-defined type.");
+	}
+
+	@Override
+	public boolean supportsInputConversion(Class<?> clazz) {
+		throw new TableException("An unresolved user-defined type does not support any input conversion.");
+	}
+
+	@Override
+	public boolean supportsOutputConversion(Class<?> clazz) {
+		throw new TableException("An unresolved user-defined type does not support any output conversion.");
+	}
+
+	@Override
+	public Class<?> getDefaultConversion() {
+		throw new TableException("An unresolved user-defined type has no default conversion.");
+	}
+
+	@Override
+	public List<LogicalType> getChildren() {
+		throw new TableException("An unresolved user-defined type cannot return children.");
+	}
+
+	@Override
+	public <R> R accept(LogicalTypeVisitor<R> visitor) {
+		return visitor.visit(this);
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		if (!super.equals(o)) {
+			return false;
+		}
+		UnresolvedUserDefinedType that = (UnresolvedUserDefinedType) o;
+		return Objects.equals(catalog, that.catalog) &&
+			Objects.equals(database, that.database) &&
+			typeIdentifier.equals(that.typeIdentifier);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(super.hashCode(), catalog, database, typeIdentifier);
+	}
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
index 49feb47..6bbbc80 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
@@ -55,6 +55,7 @@ import org.apache.flink.table.types.logical.TimestampKind;
 import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.table.types.logical.TinyIntType;
 import org.apache.flink.table.types.logical.TypeInformationAnyType;
+import org.apache.flink.table.types.logical.UnresolvedUserDefinedType;
 import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.flink.table.types.logical.YearMonthIntervalType;
@@ -610,6 +611,16 @@ public class LogicalTypesTest {
 	}
 
 	@Test
+	public void testUnresolvedUserDefinedType() {
+		final UnresolvedUserDefinedType unresolvedType =
+			new UnresolvedUserDefinedType("catalog", "database", "Type");
+
+		testEquality(unresolvedType, new UnresolvedUserDefinedType("different", "database", "Type"));
+
+		testStringSummary(unresolvedType, "`catalog`.`database`.`Type`");
+	}
+
+	@Test
 	public void testEmptyStringLiterals() {
 		final CharType charType = CharType.ofEmptyLiteral();
 		final VarCharType varcharType = VarCharType.ofEmptyLiteral();


[flink] 01/06: [hotfix][table-common] Fix minor typos in ObjectIdentifier

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3c443a0ea4da5103ebe4abe26f1da2b639a8206b
Author: Timo Walther <tw...@apache.org>
AuthorDate: Wed Jul 10 08:40:47 2019 +0200

    [hotfix][table-common] Fix minor typos in ObjectIdentifier
---
 .../java/org/apache/flink/table/catalog/ObjectIdentifier.java     | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ObjectIdentifier.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ObjectIdentifier.java
index b791a70..6e2f9c7 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ObjectIdentifier.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ObjectIdentifier.java
@@ -33,15 +33,15 @@ import static org.apache.flink.table.utils.EncodingUtils.escapeIdentifier;
  * <p>While {@link ObjectPath} is used within the same catalog, instances of this class can be used
  * across catalogs.
  *
- * <p>Two objects are considered equal if they share the same type identifier in a stable session context.
+ * <p>Two objects are considered equal if they share the same object identifier in a stable session context.
  */
 public final class ObjectIdentifier implements Serializable {
 
-	private String catalogName;
+	private final String catalogName;
 
-	private String databaseName;
+	private final String databaseName;
 
-	private String objectName;
+	private final String objectName;
 
 	public static ObjectIdentifier of(String catalogName, String databaseName, String objectName) {
 		return new ObjectIdentifier(catalogName, databaseName, objectName);


[flink] 04/06: [FLINK-13078][table-common] Simplify serializable string representation for parsers

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e750efc0a84c039f42f1edc81257f9fab0de03d9
Author: Timo Walther <tw...@apache.org>
AuthorDate: Wed Jul 10 08:46:29 2019 +0200

    [FLINK-13078][table-common] Simplify serializable string representation for parsers
---
 .../java/org/apache/flink/table/types/logical/AnyType.java   |  4 ++--
 .../org/apache/flink/table/types/logical/SymbolType.java     |  2 +-
 .../flink/table/types/logical/TypeInformationAnyType.java    |  2 +-
 .../java/org/apache/flink/table/types/LogicalTypesTest.java  | 12 ++++++------
 4 files changed, 10 insertions(+), 10 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/AnyType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/AnyType.java
index ba849ca..82e805d 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/AnyType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/AnyType.java
@@ -35,7 +35,7 @@ import java.util.Set;
  * Logical type of an arbitrary serialized type. This type is a black box within the table ecosystem
  * and is only deserialized at the edges. The any type is an extension to the SQL standard.
  *
- * <p>The serialized string representation is {@code ANY(c, s)} where {@code c} is the originating
+ * <p>The serialized string representation is {@code ANY('c', 's')} where {@code c} is the originating
  * class and {@code s} is the serialized {@link TypeSerializerSnapshot} in Base64 encoding.
  *
  * @param <T> originating class for this type
@@ -43,7 +43,7 @@ import java.util.Set;
 @PublicEvolving
 public final class AnyType<T> extends LogicalType {
 
-	private static final String FORMAT = "ANY(%s, %s)";
+	private static final String FORMAT = "ANY('%s', '%s')";
 
 	private static final Set<String> INPUT_OUTPUT_CONVERSION = conversionSet(
 		byte[].class.getName(),
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/SymbolType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/SymbolType.java
index c57857c..643e10f 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/SymbolType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/SymbolType.java
@@ -40,7 +40,7 @@ import java.util.Objects;
 @PublicEvolving
 public final class SymbolType<T extends TableSymbol> extends LogicalType {
 
-	private static final String FORMAT = "SYMBOL(%s)";
+	private static final String FORMAT = "SYMBOL('%s')";
 
 	private final Class<T> symbolClass;
 
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TypeInformationAnyType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TypeInformationAnyType.java
index 2c1c0dc..9e64044 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TypeInformationAnyType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TypeInformationAnyType.java
@@ -49,7 +49,7 @@ import java.util.Set;
 @PublicEvolving
 public final class TypeInformationAnyType<T> extends LogicalType {
 
-	private static final String FORMAT = "ANY(%s, ?)";
+	private static final String FORMAT = "ANY('%s', ?)";
 
 	private static final Set<String> INPUT_OUTPUT_CONVERSION = conversionSet(
 		byte[].class.getName(),
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
index edf28cf..49feb47 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
@@ -557,7 +557,7 @@ public class LogicalTypesTest {
 
 		testEquality(anyType, new TypeInformationAnyType<>(Types.TUPLE(Types.STRING, Types.LONG)));
 
-		testStringSummary(anyType, "ANY(org.apache.flink.api.java.tuple.Tuple2, ?)");
+		testStringSummary(anyType, "ANY('org.apache.flink.api.java.tuple.Tuple2', ?)");
 
 		testNullability(anyType);
 
@@ -572,8 +572,8 @@ public class LogicalTypesTest {
 	public void testAnyType() {
 		testAll(
 			new AnyType<>(Human.class, new KryoSerializer<>(Human.class, new ExecutionConfig())),
-				"ANY(org.apache.flink.table.types.LogicalTypesTest$Human, " +
-					"AEdvcmcuYXBhY2hlLmZsaW5rLmFwaS5qYXZhLnR5cGV1dGlscy5ydW50aW1lLmtyeW8uS3J5b1Nlcml" +
+				"ANY('org.apache.flink.table.types.LogicalTypesTest$Human', " +
+					"'AEdvcmcuYXBhY2hlLmZsaW5rLmFwaS5qYXZhLnR5cGV1dGlscy5ydW50aW1lLmtyeW8uS3J5b1Nlcml" +
 					"hbGl6ZXJTbmFwc2hvdAAAAAIAM29yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuTG9naWNhbFR5cG" +
 					"VzVGVzdCRIdW1hbgAABPLGmj1wAAAAAgAzb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5Mb2dpY" +
 					"2FsVHlwZXNUZXN0JEh1bWFuAQAAADUAM29yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuTG9naWNh" +
@@ -583,8 +583,8 @@ public class LogicalTypesTest {
 					"mcuYXBhY2hlLmZsaW5rLmFwaS5qYXZhLnR5cGV1dGlscy5ydW50aW1lLmtyeW8uU2VyaWFsaXplcnMk" +
 					"RHVtbXlBdnJvUmVnaXN0ZXJlZENsYXNzAAAAAQBZb3JnLmFwYWNoZS5mbGluay5hcGkuamF2YS50eXB" +
 					"ldXRpbHMucnVudGltZS5rcnlvLlNlcmlhbGl6ZXJzJER1bW15QXZyb0tyeW9TZXJpYWxpemVyQ2xhc3" +
-					"MAAATyxpo9cAAAAAAAAATyxpo9cAAAAAA=)",
-			"ANY(org.apache.flink.table.types.LogicalTypesTest$Human, ...)",
+					"MAAATyxpo9cAAAAAAAAATyxpo9cAAAAAA=')",
+			"ANY('org.apache.flink.table.types.LogicalTypesTest$Human', '...')",
 			new Class[]{Human.class, User.class}, // every User is Human
 			new Class[]{Human.class},
 			new LogicalType[]{},
@@ -598,7 +598,7 @@ public class LogicalTypesTest {
 
 		testEquality(symbolType, new SymbolType<>(TimePointUnit.class));
 
-		testStringSummary(symbolType, "SYMBOL(" + TimeIntervalUnit.class.getName() + ")");
+		testStringSummary(symbolType, "SYMBOL('" + TimeIntervalUnit.class.getName() + "')");
 
 		testNullability(symbolType);
 


[flink] 06/06: [FLINK-13078][table-common] Add a logical type parser

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b2922105e08ac307835022acbf0bab4ed819d862
Author: Timo Walther <tw...@apache.org>
AuthorDate: Wed Jul 10 08:52:30 2019 +0200

    [FLINK-13078][table-common] Add a logical type parser
    
    This adds a parser for all logical types defined in FLIP-37.
    
    This closes #9061.
---
 .../flink/table/types/logical/LogicalType.java     |   3 +
 .../types/logical/utils/LogicalTypeParser.java     | 900 +++++++++++++++++++++
 .../apache/flink/table/utils/EncodingUtils.java    |   6 +-
 .../apache/flink/table/utils/TypeStringUtils.java  |  12 +-
 .../flink/table/types/LogicalTypeParserTest.java   | 519 ++++++++++++
 5 files changed, 1437 insertions(+), 3 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java
index 4e4942a..cc46533 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.types.logical;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.types.logical.utils.LogicalTypeCasts;
 import org.apache.flink.table.types.logical.utils.LogicalTypeGeneralization;
+import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
 import org.apache.flink.util.Preconditions;
 
 import java.io.Serializable;
@@ -98,6 +99,8 @@ public abstract class LogicalType implements Serializable {
 	 * Returns a string that fully serializes this instance. The serialized string can be used for
 	 * transmitting or persisting a type.
 	 *
+	 * <p>See {@link LogicalTypeParser} for the reverse operation.
+	 *
 	 * @return detailed string for transmission or persistence
 	 */
 	public abstract String asSerializableString();
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeParser.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeParser.java
new file mode 100644
index 0000000..b6fcd07
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeParser.java
@@ -0,0 +1,900 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical.utils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.AnyType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.UnresolvedUserDefinedType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType.YearMonthResolution;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+import org.apache.flink.table.utils.EncodingUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Parser for creating instances of {@link LogicalType} from a serialized string created with
+ * {@link LogicalType#asSerializableString()}.
+ *
+ * <p>In addition to the serializable string representations, this parser also supports common
+ * shortcuts for certain types. This includes:
+ * <ul>
+ *     <li>{@code STRING} as a synonym for {@code VARCHAR(INT_MAX)}</li>
+ *     <li>{@code BYTES} as a synonym for {@code VARBINARY(INT_MAX)}</li>
+ *     <li>{@code NUMERIC} and {@code DEC} as synonyms for {@code DECIMAL}</li>
+ *     <li>{@code INTEGER} as a synonym for {@code INT}</li>
+ *     <li>{@code DOUBLE PRECISION} as a synonym for {@code DOUBLE}</li>
+ *     <li>{@code TIME WITHOUT TIME ZONE} as a synonym for {@code TIME}</li>
+ *     <li>{@code TIMESTAMP WITHOUT TIME ZONE} as a synonym for {@code TIMESTAMP}</li>
+ *     <li>{@code type ARRAY} as a synonym for {@code ARRAY<type>}</li>
+ *     <li>{@code type MULTISET} as a synonym for {@code MULTISET<type>}</li>
+ *     <li>{@code ROW(...)} as a synonym for {@code ROW<...>}</li>
+ *     <li>{@code type NULL} as a synonym for {@code type}</li>
+ * </ul>
+ *
+ * <p>Furthermore, it returns {@link UnresolvedUserDefinedType} for unknown types (partially or fully
+ * qualified such as {@code [catalog].[database].[type]}).
+ */
+@PublicEvolving
+public final class LogicalTypeParser {
+
+	/**
+	 * Parses a type string. All types will be fully resolved except for {@link UnresolvedUserDefinedType}s.
+	 *
+	 * @param typeString a string like "ROW(field1 INT, field2 BOOLEAN)"
+	 * @param classLoader class loader for loading classes of the ANY type
+	 * @throws ValidationException in case of parsing errors.
+	 */
+	public static LogicalType parse(String typeString, ClassLoader classLoader) {
+		final List<Token> tokens = tokenize(typeString);
+		final TokenParser converter = new TokenParser(typeString, tokens, classLoader);
+		return converter.parseTokens();
+	}
+
+	/**
+	 * Parses a type string. All types will be fully resolved except for {@link UnresolvedUserDefinedType}s.
+	 *
+	 * @param typeString a string like "ROW(field1 INT, field2 BOOLEAN)"
+	 * @throws ValidationException in case of parsing errors.
+	 */
+	public static LogicalType parse(String typeString) {
+		return parse(typeString, Thread.currentThread().getContextClassLoader());
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Tokenizer
+	// --------------------------------------------------------------------------------------------
+
+	private static final char CHAR_BEGIN_SUBTYPE = '<';
+	private static final char CHAR_END_SUBTYPE = '>';
+	private static final char CHAR_BEGIN_PARAMETER = '(';
+	private static final char CHAR_END_PARAMETER = ')';
+	private static final char CHAR_LIST_SEPARATOR = ',';
+	private static final char CHAR_STRING = '\'';
+	private static final char CHAR_IDENTIFIER = '`';
+	private static final char CHAR_DOT = '.';
+
+	private static boolean isDelimiter(char character) {
+		return Character.isWhitespace(character) ||
+			character == CHAR_BEGIN_SUBTYPE ||
+			character == CHAR_END_SUBTYPE ||
+			character == CHAR_BEGIN_PARAMETER ||
+			character == CHAR_END_PARAMETER ||
+			character == CHAR_LIST_SEPARATOR ||
+			character == CHAR_DOT;
+	}
+
+	private static boolean isDigit(char c) {
+		return c >= '0' && c <= '9';
+	}
+
+	private static List<Token> tokenize(String chars) {
+		final List<Token> tokens = new ArrayList<>();
+		final StringBuilder builder = new StringBuilder();
+		for (int cursor = 0; cursor < chars.length(); cursor++) {
+			char curChar = chars.charAt(cursor);
+			switch (curChar) {
+				case CHAR_BEGIN_SUBTYPE:
+					tokens.add(new Token(TokenType.BEGIN_SUBTYPE, cursor, Character.toString(CHAR_BEGIN_SUBTYPE)));
+					break;
+				case CHAR_END_SUBTYPE:
+					tokens.add(new Token(TokenType.END_SUBTYPE, cursor, Character.toString(CHAR_END_SUBTYPE)));
+					break;
+				case CHAR_BEGIN_PARAMETER:
+					tokens.add(new Token(TokenType.BEGIN_PARAMETER, cursor, Character.toString(CHAR_BEGIN_PARAMETER)));
+					break;
+				case CHAR_END_PARAMETER:
+					tokens.add(new Token(TokenType.END_PARAMETER, cursor, Character.toString(CHAR_END_PARAMETER)));
+					break;
+				case CHAR_LIST_SEPARATOR:
+					tokens.add(new Token(TokenType.LIST_SEPARATOR, cursor, Character.toString(CHAR_LIST_SEPARATOR)));
+					break;
+				case CHAR_DOT:
+					tokens.add(new Token(TokenType.IDENTIFIER_SEPARATOR, cursor, Character.toString(CHAR_DOT)));
+					break;
+				case CHAR_STRING:
+					builder.setLength(0);
+					cursor = consumeEscaped(builder, chars, cursor, CHAR_STRING);
+					tokens.add(new Token(TokenType.LITERAL_STRING, cursor, builder.toString()));
+					break;
+				case CHAR_IDENTIFIER:
+					builder.setLength(0);
+					cursor = consumeEscaped(builder, chars, cursor, CHAR_IDENTIFIER);
+					tokens.add(new Token(TokenType.IDENTIFIER, cursor, builder.toString()));
+					break;
+				default:
+					if (Character.isWhitespace(curChar)) {
+						continue;
+					}
+					if (isDigit(curChar)) {
+						builder.setLength(0);
+						cursor = consumeInt(builder, chars, cursor);
+						tokens.add(new Token(TokenType.LITERAL_INT, cursor, builder.toString()));
+						break;
+					}
+					builder.setLength(0);
+					cursor = consumeIdentifier(builder, chars, cursor);
+					final String token = builder.toString();
+					final String normalizedToken = token.toUpperCase();
+					if (KEYWORDS.contains(normalizedToken)) {
+						tokens.add(new Token(TokenType.KEYWORD, cursor, normalizedToken));
+					} else {
+						tokens.add(new Token(TokenType.IDENTIFIER, cursor, token));
+					}
+			}
+		}
+
+		return tokens;
+	}
+
+	private static int consumeEscaped(StringBuilder builder, String chars, int cursor, char delimiter) {
+		// skip delimiter
+		cursor++;
+		for (; chars.length() > cursor; cursor++) {
+			final char curChar = chars.charAt(cursor);
+			if (curChar == delimiter && cursor + 1 < chars.length() && chars.charAt(cursor + 1) == delimiter) {
+				// escaping of the escaping char e.g. "'Hello '' World'"
+				cursor++;
+				builder.append(curChar);
+			} else if (curChar == delimiter) {
+				break;
+			} else {
+				builder.append(curChar);
+			}
+		}
+		return cursor;
+	}
+
+	private static int consumeInt(StringBuilder builder, String chars, int cursor) {
+		for (; chars.length() > cursor && isDigit(chars.charAt(cursor)); cursor++) {
+			builder.append(chars.charAt(cursor));
+		}
+		return cursor - 1;
+	}
+
+	private static int consumeIdentifier(StringBuilder builder, String chars, int cursor) {
+		for (; cursor < chars.length() && !isDelimiter(chars.charAt(cursor)); cursor++) {
+			builder.append(chars.charAt(cursor));
+		}
+		return cursor - 1;
+	}
+
+	private enum TokenType {
+		// e.g. "ROW<"
+		BEGIN_SUBTYPE,
+
+		// e.g. "ROW<..>"
+		END_SUBTYPE,
+
+		// e.g. "CHAR("
+		BEGIN_PARAMETER,
+
+		// e.g. "CHAR(...)"
+		END_PARAMETER,
+
+		// e.g. "ROW<INT,"
+		LIST_SEPARATOR,
+
+		// e.g. "ROW<name INT 'Comment'"
+		LITERAL_STRING,
+
+		// CHAR(12
+		LITERAL_INT,
+
+		// e.g. "CHAR" or "TO"
+		KEYWORD,
+
+		// e.g. "ROW<name" or "myCatalog.myDatabase"
+		IDENTIFIER,
+
+		// e.g. "myCatalog.myDatabase."
+		IDENTIFIER_SEPARATOR
+	}
+
+	private enum Keyword {
+		CHAR,
+		VARCHAR,
+		STRING,
+		BOOLEAN,
+		BINARY,
+		VARBINARY,
+		BYTES,
+		DECIMAL,
+		NUMERIC,
+		DEC,
+		TINYINT,
+		SMALLINT,
+		INT,
+		INTEGER,
+		BIGINT,
+		FLOAT,
+		DOUBLE,
+		PRECISION,
+		DATE,
+		TIME,
+		WITH,
+		WITHOUT,
+		LOCAL,
+		ZONE,
+		TIMESTAMP,
+		INTERVAL,
+		YEAR,
+		MONTH,
+		DAY,
+		HOUR,
+		MINUTE,
+		SECOND,
+		TO,
+		ARRAY,
+		MULTISET,
+		MAP,
+		ROW,
+		NULL,
+		ANY,
+		NOT
+	}
+
+	private static final Set<String> KEYWORDS = Stream.of(Keyword.values())
+		.map(k -> k.toString().toUpperCase())
+		.collect(Collectors.toSet());
+
+	private static class Token {
+		public final TokenType type;
+		public final int cursorPosition;
+		public final String value;
+
+		public Token(TokenType type, int cursorPosition, String value) {
+			this.type = type;
+			this.cursorPosition = cursorPosition;
+			this.value = value;
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Token Parsing
+	// --------------------------------------------------------------------------------------------
+
+	private static class TokenParser {
+
+		private final String inputString;
+
+		private final List<Token> tokens;
+
+		private final ClassLoader classLoader;
+
+		private int lastValidToken;
+
+		private int currentToken;
+
+		public TokenParser(String inputString, List<Token> tokens, ClassLoader classLoader) {
+			this.inputString = inputString;
+			this.tokens = tokens;
+			this.classLoader = classLoader;
+			this.lastValidToken = -1;
+			this.currentToken = -1;
+		}
+
+		private LogicalType parseTokens() {
+			final LogicalType type = parseTypeWithNullability();
+			if (hasRemainingTokens()) {
+				nextToken();
+				throw parsingError("Unexpected token: " + token().value);
+			}
+			return type;
+		}
+
+		private int lastCursor() {
+			if (lastValidToken < 0) {
+				return 0;
+			}
+			return tokens.get(lastValidToken).cursorPosition + 1;
+		}
+
+		private ValidationException parsingError(String cause, @Nullable Throwable t) {
+			return new ValidationException(
+				String.format(
+					"Could not parse type at position %d: %s\n Input type string: %s",
+					lastCursor(),
+					cause,
+					inputString),
+				t);
+		}
+
+		private ValidationException parsingError(String cause) {
+			return parsingError(cause, null);
+		}
+
+		private boolean hasRemainingTokens() {
+			return currentToken + 1 < tokens.size();
+		}
+
+		private Token token() {
+			return tokens.get(currentToken);
+		}
+
+		private int tokenAsInt() {
+			try {
+				return Integer.valueOf(token().value);
+			} catch (NumberFormatException e) {
+				throw parsingError("Invalid integer value.", e);
+			}
+		}
+
+		private Keyword tokenAsKeyword() {
+			return Keyword.valueOf(token().value);
+		}
+
+		private String tokenAsString() {
+			return token().value;
+		}
+
+		private void nextToken() {
+			this.currentToken++;
+			if (currentToken >= tokens.size()) {
+				throw parsingError("Unexpected end.");
+			}
+			lastValidToken = this.currentToken - 1;
+		}
+
+		private void nextToken(TokenType type) {
+			nextToken();
+			final Token token = token();
+			if (token.type != type) {
+				throw parsingError("<" + type.name() + "> expected but was <" + token.type + ">.");
+			}
+		}
+
+		private void nextToken(Keyword keyword) {
+			nextToken(TokenType.KEYWORD);
+			final Token token = token();
+			if (!keyword.equals(Keyword.valueOf(token.value))) {
+				throw parsingError("Keyword '" + keyword + "' expected but was '" + token.value + "'.");
+			}
+		}
+
+		private boolean hasNextToken(TokenType... types) {
+			if (currentToken + types.length + 1 > tokens.size()) {
+				return false;
+			}
+			for (int i = 0; i < types.length; i++) {
+				final Token lookAhead = tokens.get(currentToken + i + 1);
+				if (lookAhead.type != types[i]) {
+					return false;
+				}
+			}
+			return true;
+		}
+
+		private boolean hasNextToken(Keyword... keywords) {
+			if (currentToken + keywords.length + 1 > tokens.size()) {
+				return false;
+			}
+			for (int i = 0; i < keywords.length; i++) {
+				final Token lookAhead = tokens.get(currentToken + i + 1);
+				if (lookAhead.type != TokenType.KEYWORD ||
+						keywords[i] != Keyword.valueOf(lookAhead.value)) {
+					return false;
+				}
+			}
+			return true;
+		}
+
+		private boolean parseNullability() {
+			// "NOT NULL"
+			if (hasNextToken(Keyword.NOT, Keyword.NULL)) {
+				nextToken(Keyword.NOT);
+				nextToken(Keyword.NULL);
+				return false;
+			}
+			// explicit "NULL"
+			else if (hasNextToken(Keyword.NULL)) {
+				nextToken(Keyword.NULL);
+				return true;
+			}
+			// implicit "NULL"
+			return true;
+		}
+
+		private LogicalType parseTypeWithNullability() {
+			final LogicalType logicalType;
+			if (hasNextToken(TokenType.IDENTIFIER)) {
+				logicalType = parseTypeByIdentifier().copy(parseNullability());
+			} else {
+				logicalType = parseTypeByKeyword().copy(parseNullability());
+			}
+
+			// special case: suffix notation for ARRAY and MULTISET types
+			if (hasNextToken(Keyword.ARRAY)) {
+				nextToken(Keyword.ARRAY);
+				return new ArrayType(logicalType).copy(parseNullability());
+			} else if (hasNextToken(Keyword.MULTISET)) {
+				nextToken(Keyword.MULTISET);
+				return new MultisetType(logicalType).copy(parseNullability());
+			}
+
+			return logicalType;
+		}
+
+		private LogicalType parseTypeByKeyword() {
+			nextToken(TokenType.KEYWORD);
+			switch (tokenAsKeyword()) {
+				case CHAR:
+					return parseCharType();
+				case VARCHAR:
+					return parseVarCharType();
+				case STRING:
+					return new VarCharType(VarCharType.MAX_LENGTH);
+				case BOOLEAN:
+					return new BooleanType();
+				case BINARY:
+					return parseBinaryType();
+				case VARBINARY:
+					return parseVarBinaryType();
+				case BYTES:
+					return new VarBinaryType(VarBinaryType.MAX_LENGTH);
+				case DECIMAL:
+				case NUMERIC:
+				case DEC:
+					return parseDecimalType();
+				case TINYINT:
+					return new TinyIntType();
+				case SMALLINT:
+					return new SmallIntType();
+				case INT:
+				case INTEGER:
+					return new IntType();
+				case BIGINT:
+					return new BigIntType();
+				case FLOAT:
+					return new FloatType();
+				case DOUBLE:
+					return parseDoubleType();
+				case DATE:
+					return new DateType();
+				case TIME:
+					return parseTimeType();
+				case TIMESTAMP:
+					return parseTimestampType();
+				case INTERVAL:
+					return parseIntervalType();
+				case ARRAY:
+					return parseArrayType();
+				case MULTISET:
+					return parseMultisetType();
+				case MAP:
+					return parseMapType();
+				case ROW:
+					return parseRowType();
+				case NULL:
+					return new NullType();
+				case ANY:
+					return parseAnyType();
+				default:
+					throw parsingError("Unsupported type: " + token().value);
+			}
+		}
+
+		private LogicalType parseTypeByIdentifier() {
+			nextToken(TokenType.IDENTIFIER);
+			List<String> parts = new ArrayList<>();
+			parts.add(tokenAsString());
+			if (hasNextToken(TokenType.IDENTIFIER_SEPARATOR)) {
+				nextToken(TokenType.IDENTIFIER_SEPARATOR);
+				nextToken(TokenType.IDENTIFIER);
+				parts.add(tokenAsString());
+			}
+			if (hasNextToken(TokenType.IDENTIFIER_SEPARATOR)) {
+				nextToken(TokenType.IDENTIFIER_SEPARATOR);
+				nextToken(TokenType.IDENTIFIER);
+				parts.add(tokenAsString());
+			}
+			return new UnresolvedUserDefinedType(
+				lastPart(parts, 2),
+				lastPart(parts, 1),
+				lastPart(parts, 0));
+		}
+
+		private @Nullable String lastPart(List<String> parts, int inversePos) {
+			final int pos = parts.size() - inversePos - 1;
+			if (pos < 0) {
+				return null;
+			}
+			return parts.get(pos);
+		}
+
+		private int parseStringType() {
+			// explicit length
+			if (hasNextToken(TokenType.BEGIN_PARAMETER)) {
+				nextToken(TokenType.BEGIN_PARAMETER);
+				nextToken(TokenType.LITERAL_INT);
+				final int length = tokenAsInt();
+				nextToken(TokenType.END_PARAMETER);
+				return length;
+			}
+			// implicit length
+			return -1;
+		}
+
+		private LogicalType parseCharType() {
+			final int length = parseStringType();
+			if (length < 0) {
+				return new CharType();
+			} else {
+				return new CharType(length);
+			}
+		}
+
+		private LogicalType parseVarCharType() {
+			final int length = parseStringType();
+			if (length < 0) {
+				return new VarCharType();
+			} else {
+				return new VarCharType(length);
+			}
+		}
+
+		private LogicalType parseBinaryType() {
+			final int length = parseStringType();
+			if (length < 0) {
+				return new BinaryType();
+			} else {
+				return new BinaryType(length);
+			}
+		}
+
+		private LogicalType parseVarBinaryType() {
+			final int length = parseStringType();
+			if (length < 0) {
+				return new VarBinaryType();
+			} else {
+				return new VarBinaryType(length);
+			}
+		}
+
+		private LogicalType parseDecimalType() {
+			int precision = DecimalType.DEFAULT_PRECISION;
+			int scale = DecimalType.DEFAULT_SCALE;
+			if (hasNextToken(TokenType.BEGIN_PARAMETER)) {
+				nextToken(TokenType.BEGIN_PARAMETER);
+				nextToken(TokenType.LITERAL_INT);
+				precision = tokenAsInt();
+				if (hasNextToken(TokenType.LIST_SEPARATOR)) {
+					nextToken(TokenType.LIST_SEPARATOR);
+					nextToken(TokenType.LITERAL_INT);
+					scale = tokenAsInt();
+				}
+				nextToken(TokenType.END_PARAMETER);
+			}
+			return new DecimalType(precision, scale);
+		}
+
+		private LogicalType parseDoubleType() {
+			if (hasNextToken(Keyword.PRECISION)) {
+				nextToken(Keyword.PRECISION);
+			}
+			return new DoubleType();
+		}
+
+		private LogicalType parseTimeType() {
+			int precision = parseOptionalPrecision(TimeType.DEFAULT_PRECISION);
+			if (hasNextToken(Keyword.WITHOUT)) {
+				nextToken(Keyword.WITHOUT);
+				nextToken(Keyword.TIME);
+				nextToken(Keyword.ZONE);
+			}
+			return new TimeType(precision);
+		}
+
+		private LogicalType parseTimestampType() {
+			int precision = parseOptionalPrecision(TimestampType.DEFAULT_PRECISION);
+			if (hasNextToken(Keyword.WITHOUT)) {
+				nextToken(Keyword.WITHOUT);
+				nextToken(Keyword.TIME);
+				nextToken(Keyword.ZONE);
+			} else if (hasNextToken(Keyword.WITH)) {
+				nextToken(Keyword.WITH);
+				if (hasNextToken(Keyword.LOCAL)) {
+					nextToken(Keyword.LOCAL);
+					nextToken(Keyword.TIME);
+					nextToken(Keyword.ZONE);
+					return new LocalZonedTimestampType(precision);
+				} else {
+					nextToken(Keyword.TIME);
+					nextToken(Keyword.ZONE);
+					return new ZonedTimestampType(precision);
+				}
+			}
+			return new TimestampType(precision);
+		}
+
+		private LogicalType parseIntervalType() {
+			nextToken(TokenType.KEYWORD);
+			switch (tokenAsKeyword()) {
+				case YEAR:
+				case MONTH:
+					return parseYearMonthIntervalType();
+				case DAY:
+				case HOUR:
+				case MINUTE:
+				case SECOND:
+					return parseDayTimeIntervalType();
+				default:
+					throw parsingError("Invalid interval resolution.");
+			}
+		}
+
+		private LogicalType parseYearMonthIntervalType() {
+			int yearPrecision = YearMonthIntervalType.DEFAULT_PRECISION;
+			switch (tokenAsKeyword()) {
+				case YEAR:
+					yearPrecision = parseOptionalPrecision(yearPrecision);
+					if (hasNextToken(Keyword.TO)) {
+						nextToken(Keyword.TO);
+						nextToken(Keyword.MONTH);
+						return new YearMonthIntervalType(
+							YearMonthResolution.YEAR_TO_MONTH,
+							yearPrecision);
+					}
+					return new YearMonthIntervalType(
+						YearMonthResolution.YEAR,
+						yearPrecision);
+				case MONTH:
+					return new YearMonthIntervalType(
+						YearMonthResolution.MONTH,
+						yearPrecision);
+				default:
+					throw parsingError("Invalid year-month interval resolution.");
+			}
+		}
+
+		private LogicalType parseDayTimeIntervalType() {
+			int dayPrecision = DayTimeIntervalType.DEFAULT_DAY_PRECISION;
+			int fractionalPrecision = DayTimeIntervalType.DEFAULT_FRACTIONAL_PRECISION;
+			switch (tokenAsKeyword()) {
+				case DAY:
+					dayPrecision = parseOptionalPrecision(dayPrecision);
+					if (hasNextToken(Keyword.TO)) {
+						nextToken(Keyword.TO);
+						nextToken(TokenType.KEYWORD);
+						switch (tokenAsKeyword()) {
+							case HOUR:
+								return new DayTimeIntervalType(
+									DayTimeResolution.DAY_TO_HOUR,
+									dayPrecision,
+									fractionalPrecision);
+							case MINUTE:
+								return new DayTimeIntervalType(
+									DayTimeResolution.DAY_TO_MINUTE,
+									dayPrecision,
+									fractionalPrecision);
+							case SECOND:
+								fractionalPrecision = parseOptionalPrecision(fractionalPrecision);
+								return new DayTimeIntervalType(
+									DayTimeResolution.DAY_TO_SECOND,
+									dayPrecision,
+									fractionalPrecision);
+							default:
+								throw parsingError("Invalid day-time interval resolution.");
+						}
+					}
+					return new DayTimeIntervalType(
+						DayTimeResolution.DAY,
+						dayPrecision,
+						fractionalPrecision);
+				case HOUR:
+					if (hasNextToken(Keyword.TO)) {
+						nextToken(Keyword.TO);
+						nextToken(TokenType.KEYWORD);
+						switch (tokenAsKeyword()) {
+							case MINUTE:
+								return new DayTimeIntervalType(
+									DayTimeResolution.HOUR_TO_MINUTE,
+									dayPrecision,
+									fractionalPrecision);
+							case SECOND:
+								fractionalPrecision = parseOptionalPrecision(fractionalPrecision);
+								return new DayTimeIntervalType(
+									DayTimeResolution.HOUR_TO_SECOND,
+									dayPrecision,
+									fractionalPrecision);
+							default:
+								throw parsingError("Invalid day-time interval resolution.");
+						}
+					}
+					return new DayTimeIntervalType(
+						DayTimeResolution.HOUR,
+						dayPrecision,
+						fractionalPrecision);
+				case MINUTE:
+					if (hasNextToken(Keyword.TO)) {
+						nextToken(Keyword.TO);
+						nextToken(Keyword.SECOND);
+						fractionalPrecision = parseOptionalPrecision(fractionalPrecision);
+						return new DayTimeIntervalType(
+							DayTimeResolution.MINUTE_TO_SECOND,
+							dayPrecision,
+							fractionalPrecision);
+					}
+					return new DayTimeIntervalType(
+						DayTimeResolution.MINUTE,
+						dayPrecision,
+						fractionalPrecision);
+				case SECOND:
+					fractionalPrecision = parseOptionalPrecision(fractionalPrecision);
+					return new DayTimeIntervalType(
+						DayTimeResolution.SECOND,
+						dayPrecision,
+						fractionalPrecision);
+				default:
+					throw parsingError("Invalid day-time interval resolution.");
+			}
+		}
+
+		private int parseOptionalPrecision(int defaultPrecision) {
+			int precision = defaultPrecision;
+			if (hasNextToken(TokenType.BEGIN_PARAMETER)) {
+				nextToken(TokenType.BEGIN_PARAMETER);
+				nextToken(TokenType.LITERAL_INT);
+				precision = tokenAsInt();
+				nextToken(TokenType.END_PARAMETER);
+			}
+			return precision;
+		}
+
+		private LogicalType parseArrayType() {
+			nextToken(TokenType.BEGIN_SUBTYPE);
+			final LogicalType elementType = parseTypeWithNullability();
+			nextToken(TokenType.END_SUBTYPE);
+			return new ArrayType(elementType);
+		}
+
+		private LogicalType parseMultisetType() {
+			nextToken(TokenType.BEGIN_SUBTYPE);
+			final LogicalType elementType = parseTypeWithNullability();
+			nextToken(TokenType.END_SUBTYPE);
+			return new MultisetType(elementType);
+		}
+
+		private LogicalType parseMapType() {
+			nextToken(TokenType.BEGIN_SUBTYPE);
+			final LogicalType keyType = parseTypeWithNullability();
+			nextToken(TokenType.LIST_SEPARATOR);
+			final LogicalType valueType = parseTypeWithNullability();
+			nextToken(TokenType.END_SUBTYPE);
+			return new MapType(keyType, valueType);
+		}
+
+		private LogicalType parseRowType() {
+			List<RowType.RowField> fields;
+			// SQL standard notation
+			if (hasNextToken(TokenType.BEGIN_PARAMETER)) {
+				nextToken(TokenType.BEGIN_PARAMETER);
+				fields = parseRowFields(TokenType.END_PARAMETER);
+				nextToken(TokenType.END_PARAMETER);
+			} else {
+				nextToken(TokenType.BEGIN_SUBTYPE);
+				fields = parseRowFields(TokenType.END_SUBTYPE);
+				nextToken(TokenType.END_SUBTYPE);
+			}
+			return new RowType(fields);
+		}
+
+		private List<RowType.RowField> parseRowFields(TokenType endToken) {
+			List<RowType.RowField> fields = new ArrayList<>();
+			boolean isFirst = true;
+			while (!hasNextToken(endToken)) {
+				if (isFirst) {
+					isFirst = false;
+				} else {
+					nextToken(TokenType.LIST_SEPARATOR);
+				}
+				nextToken(TokenType.IDENTIFIER);
+				final String name = tokenAsString();
+				final LogicalType type = parseTypeWithNullability();
+				if (hasNextToken(TokenType.LITERAL_STRING)) {
+					nextToken(TokenType.LITERAL_STRING);
+					final String description = tokenAsString();
+					fields.add(new RowType.RowField(name, type, description));
+				} else {
+					fields.add(new RowType.RowField(name, type));
+				}
+			}
+			return fields;
+		}
+
+		@SuppressWarnings("unchecked")
+		private LogicalType parseAnyType() {
+			nextToken(TokenType.BEGIN_PARAMETER);
+			nextToken(TokenType.LITERAL_STRING);
+			final String className = tokenAsString();
+
+			nextToken(TokenType.LIST_SEPARATOR);
+			nextToken(TokenType.LITERAL_STRING);
+			final String serializer = tokenAsString();
+			nextToken(TokenType.END_PARAMETER);
+
+			try {
+				final Class<?> clazz = Class.forName(className, true, classLoader);
+				final byte[] bytes = EncodingUtils.decodeBase64ToBytes(serializer);
+				final DataInputDeserializer inputDeserializer = new DataInputDeserializer(bytes);
+				final TypeSerializerSnapshot<?> snapshot = TypeSerializerSnapshot.readVersionedSnapshot(
+					inputDeserializer,
+					classLoader);
+				return new AnyType(clazz, snapshot.restoreSerializer());
+			} catch (Throwable t) {
+				throw parsingError(
+					"Unable to restore the ANY type of class '" + className + "' with " +
+						"serializer snapshot '" + serializer + "'.", t);
+			}
+		}
+	}
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java
index 95d10e3..ca88427 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java
@@ -110,12 +110,16 @@ public abstract class EncodingUtils {
 		return new String(java.util.Base64.getEncoder().encode(bytes), UTF_8);
 	}
 
+	public static byte[] decodeBase64ToBytes(String base64) {
+		return java.util.Base64.getDecoder().decode(base64.getBytes(UTF_8));
+	}
+
 	public static String encodeStringToBase64(String string) {
 		return encodeBytesToBase64(string.getBytes(UTF_8));
 	}
 
 	public static String decodeBase64ToString(String base64) {
-		return new String(java.util.Base64.getDecoder().decode(base64.getBytes(UTF_8)), UTF_8);
+		return new String(decodeBase64ToBytes(base64), UTF_8);
 	}
 
 	public static byte[] md5(String string) {
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TypeStringUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TypeStringUtils.java
index faf26d0..6bf414a 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TypeStringUtils.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TypeStringUtils.java
@@ -32,13 +32,21 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
 
 import java.util.ArrayList;
 import java.util.List;
 
 /**
-  * Utilities to convert {@link TypeInformation} into a string representation and back.
-  */
+ * Utilities to convert {@link TypeInformation} into a string representation and back.
+ *
+ * @deprecated This utility is based on {@link TypeInformation}. However, the Table & SQL API is
+ *             currently updated to use {@link DataType}s based on {@link LogicalType}s. Use
+ *             {@link LogicalTypeParser} instead.
+ */
+@Deprecated
 @PublicEvolving
 public class TypeStringUtils {
 
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeParserTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeParserTest.java
new file mode 100644
index 0000000..df55424
--- /dev/null
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeParserTest.java
@@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.AnyType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.UnresolvedUserDefinedType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType.YearMonthResolution;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.UNRESOLVED;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link LogicalTypeParser}.
+ */
+@RunWith(Parameterized.class)
+public class LogicalTypeParserTest {
+
+	@Parameters(name = "{index}: [From: {0}, To: {1}]")
+	public static List<TestSpec> testData() {
+		return Arrays.asList(
+
+			TestSpec
+				.forString("CHAR")
+				.expectType(new CharType()),
+
+			TestSpec
+				.forString("CHAR NOT NULL")
+				.expectType(new  CharType().copy(false)),
+
+			TestSpec
+				.forString("CHAR   NOT \t\nNULL")
+				.expectType(new  CharType().copy(false)),
+
+			TestSpec
+				.forString("char not null")
+				.expectType(new CharType().copy(false)),
+
+			TestSpec
+				.forString("CHAR NULL")
+				.expectType(new CharType()),
+
+			TestSpec
+				.forString("CHAR(33)")
+				.expectType(new CharType(33)),
+
+			TestSpec
+				.forString("VARCHAR")
+				.expectType(new VarCharType()),
+
+			TestSpec
+				.forString("VARCHAR(33)")
+				.expectType(new VarCharType(33)),
+
+			TestSpec
+				.forString("STRING")
+				.expectType(new VarCharType(VarCharType.MAX_LENGTH)),
+
+			TestSpec
+				.forString("BOOLEAN")
+				.expectType(new BooleanType()),
+
+			TestSpec
+				.forString("BINARY")
+				.expectType(new BinaryType()),
+
+			TestSpec
+				.forString("BINARY(33)")
+				.expectType(new BinaryType(33)),
+
+			TestSpec
+				.forString("VARBINARY")
+				.expectType(new VarBinaryType()),
+
+			TestSpec
+				.forString("VARBINARY(33)")
+				.expectType(new VarBinaryType(33)),
+
+			TestSpec
+				.forString("BYTES")
+				.expectType(new VarBinaryType(VarBinaryType.MAX_LENGTH)),
+
+			TestSpec
+				.forString("DECIMAL")
+				.expectType(new DecimalType()),
+
+			TestSpec
+				.forString("DEC")
+				.expectType(new DecimalType()),
+
+			TestSpec
+				.forString("NUMERIC")
+				.expectType(new DecimalType()),
+
+			TestSpec
+				.forString("DECIMAL(10)")
+				.expectType(new DecimalType(10)),
+
+			TestSpec
+				.forString("DEC(10)")
+				.expectType(new DecimalType(10)),
+
+			TestSpec
+				.forString("NUMERIC(10)")
+				.expectType(new DecimalType(10)),
+
+			TestSpec
+				.forString("DECIMAL(10, 3)")
+				.expectType(new DecimalType(10, 3)),
+
+			TestSpec
+				.forString("DEC(10, 3)")
+				.expectType(new DecimalType(10, 3)),
+
+			TestSpec
+				.forString("NUMERIC(10, 3)")
+				.expectType(new DecimalType(10, 3)),
+
+			TestSpec
+				.forString("TINYINT")
+				.expectType(new TinyIntType()),
+
+			TestSpec
+				.forString("SMALLINT")
+				.expectType(new SmallIntType()),
+
+			TestSpec
+				.forString("INTEGER")
+				.expectType(new IntType()),
+
+			TestSpec
+				.forString("INT")
+				.expectType(new IntType()),
+
+			TestSpec
+				.forString("BIGINT")
+				.expectType(new BigIntType()),
+
+			TestSpec
+				.forString("FLOAT")
+				.expectType(new FloatType()),
+
+			TestSpec
+				.forString("DOUBLE")
+				.expectType(new DoubleType()),
+
+			TestSpec
+				.forString("DOUBLE PRECISION")
+				.expectType(new DoubleType()),
+
+			TestSpec
+				.forString("DATE")
+				.expectType(new DateType()),
+
+			TestSpec
+				.forString("TIME")
+				.expectType(new TimeType()),
+
+			TestSpec
+				.forString("TIME(3)")
+				.expectType(new TimeType(3)),
+
+			TestSpec
+				.forString("TIME WITHOUT TIME ZONE")
+				.expectType(new TimeType()),
+
+			TestSpec
+				.forString("TIME(3) WITHOUT TIME ZONE")
+				.expectType(new TimeType(3)),
+
+			TestSpec
+				.forString("TIMESTAMP")
+				.expectType(new TimestampType()),
+
+			TestSpec
+				.forString("TIMESTAMP(3)")
+				.expectType(new TimestampType(3)),
+
+			TestSpec
+				.forString("TIMESTAMP WITHOUT TIME ZONE")
+				.expectType(new TimestampType()),
+
+			TestSpec
+				.forString("TIMESTAMP(3) WITHOUT TIME ZONE")
+				.expectType(new TimestampType(3)),
+
+			TestSpec
+				.forString("TIMESTAMP WITH TIME ZONE")
+				.expectType(new ZonedTimestampType()),
+
+			TestSpec
+				.forString("TIMESTAMP(3) WITH TIME ZONE")
+				.expectType(new ZonedTimestampType(3)),
+
+			TestSpec
+				.forString("TIMESTAMP WITH LOCAL TIME ZONE")
+				.expectType(new LocalZonedTimestampType()),
+
+			TestSpec
+				.forString("TIMESTAMP(3) WITH LOCAL TIME ZONE")
+				.expectType(new LocalZonedTimestampType(3)),
+
+			TestSpec
+				.forString("INTERVAL YEAR")
+				.expectType(new YearMonthIntervalType(YearMonthResolution.YEAR)),
+
+			TestSpec
+				.forString("INTERVAL YEAR(4)")
+				.expectType(new YearMonthIntervalType(YearMonthResolution.YEAR, 4)),
+
+			TestSpec
+				.forString("INTERVAL MONTH")
+				.expectType(new YearMonthIntervalType(YearMonthResolution.MONTH)),
+
+			TestSpec
+				.forString("INTERVAL YEAR TO MONTH")
+				.expectType(new YearMonthIntervalType(YearMonthResolution.YEAR_TO_MONTH)),
+
+			TestSpec
+				.forString("INTERVAL YEAR(4) TO MONTH")
+				.expectType(new YearMonthIntervalType(YearMonthResolution.YEAR_TO_MONTH, 4)),
+
+			TestSpec
+				.forString("INTERVAL DAY(2) TO SECOND(3)")
+				.expectType(new DayTimeIntervalType(DayTimeResolution.DAY_TO_SECOND, 2, 3)),
+
+			TestSpec
+				.forString("INTERVAL HOUR TO SECOND(3)")
+				.expectType(
+					new DayTimeIntervalType(
+						DayTimeResolution.HOUR_TO_SECOND,
+						DayTimeIntervalType.DEFAULT_DAY_PRECISION,
+						3)
+				),
+
+			TestSpec
+				.forString("INTERVAL MINUTE")
+				.expectType(new DayTimeIntervalType(DayTimeResolution.MINUTE)),
+
+			TestSpec
+				.forString("ARRAY<TIMESTAMP(3) WITH LOCAL TIME ZONE>")
+				.expectType(new ArrayType(new LocalZonedTimestampType(3))),
+
+			TestSpec
+				.forString("ARRAY<INT NOT NULL>")
+				.expectType(new ArrayType(new IntType(false))),
+
+			TestSpec
+				.forString("INT ARRAY")
+				.expectType(new ArrayType(new IntType())),
+
+			TestSpec
+				.forString("INT NOT NULL ARRAY")
+				.expectType(new ArrayType(new IntType(false))),
+
+			TestSpec
+				.forString("INT ARRAY NOT NULL")
+				.expectType(new ArrayType(false, new IntType())),
+
+			TestSpec
+				.forString("MULTISET<INT NOT NULL>")
+				.expectType(new MultisetType(new IntType(false))),
+
+			TestSpec
+				.forString("INT MULTISET")
+				.expectType(new MultisetType(new IntType())),
+
+			TestSpec
+				.forString("INT NOT NULL MULTISET")
+				.expectType(new MultisetType(new IntType(false))),
+
+			TestSpec
+				.forString("INT MULTISET NOT NULL")
+				.expectType(new MultisetType(false, new IntType())),
+
+			TestSpec
+				.forString("MAP<BIGINT, BOOLEAN>")
+				.expectType(new MapType(new BigIntType(), new BooleanType())),
+
+			TestSpec
+				.forString("ROW<f0 INT NOT NULL, f1 BOOLEAN>")
+				.expectType(
+					new RowType(
+						Arrays.asList(
+							new RowType.RowField("f0", new IntType(false)),
+							new RowType.RowField("f1", new BooleanType())))
+				),
+
+			TestSpec
+				.forString("ROW(f0 INT NOT NULL, f1 BOOLEAN)")
+				.expectType(
+					new RowType(
+						Arrays.asList(
+							new RowType.RowField("f0", new IntType(false)),
+							new RowType.RowField("f1", new BooleanType())))
+				),
+
+			TestSpec
+				.forString("ROW<`f0` INT>")
+				.expectType(
+					new RowType(
+						Collections.singletonList(new RowType.RowField("f0", new IntType())))
+				),
+
+			TestSpec
+				.forString("ROW(`f0` INT)")
+				.expectType(
+					new RowType(
+						Collections.singletonList(new RowType.RowField("f0", new IntType())))
+				),
+
+			TestSpec
+				.forString("ROW<>")
+				.expectType(new RowType(Collections.emptyList())),
+
+			TestSpec
+				.forString("ROW()")
+				.expectType(new RowType(Collections.emptyList())),
+
+			TestSpec
+				.forString("ROW<f0 INT NOT NULL 'This is a comment.', f1 BOOLEAN 'This as well.'>")
+				.expectType(
+					new RowType(
+						Arrays.asList(
+							new RowType.RowField("f0", new IntType(false), "This is a comment."),
+							new RowType.RowField("f1", new BooleanType(), "This as well.")))
+				),
+
+			TestSpec
+				.forString("NULL")
+				.expectType(new NullType()),
+
+			TestSpec
+				.forString(createAnyType(LogicalTypeParserTest.class).asSerializableString())
+				.expectType(createAnyType(LogicalTypeParserTest.class)),
+
+			TestSpec
+				.forString("cat.db.MyType")
+				.expectType(new UnresolvedUserDefinedType("cat", "db", "MyType")),
+
+			TestSpec
+				.forString("`db`.`MyType`")
+				.expectType(new UnresolvedUserDefinedType(null, "db", "MyType")),
+
+			TestSpec
+				.forString("MyType")
+				.expectType(new UnresolvedUserDefinedType(null, null, "MyType")),
+
+			TestSpec
+				.forString("ARRAY<MyType>")
+				.expectType(new ArrayType(new UnresolvedUserDefinedType(null, null, "MyType"))),
+
+			TestSpec
+				.forString("ROW<f0 MyType, f1 `c`.`d`.`t`>")
+				.expectType(
+					RowType.of(
+						new UnresolvedUserDefinedType(null, null, "MyType"),
+						new UnresolvedUserDefinedType("c", "d", "t"))
+				),
+
+			// error message testing
+
+			TestSpec
+				.forString("ROW<`f0")
+				.expectErrorMessage("Unexpected end"),
+
+			TestSpec
+				.forString("ROW<`f0`")
+				.expectErrorMessage("Unexpected end"),
+
+			TestSpec
+				.forString("VARCHAR(test)")
+				.expectErrorMessage("<LITERAL_INT> expected"),
+
+			TestSpec
+				.forString("VARCHAR(33333333333)")
+				.expectErrorMessage("Invalid integer value"),
+
+			TestSpec
+				.forString("ROW<field INT, field2>")
+				.expectErrorMessage("<KEYWORD> expected"),
+
+			TestSpec
+				.forString("ANY('unknown.class', '')")
+				.expectErrorMessage("Unable to restore the ANY type")
+		);
+	}
+
+	@Parameter
+	public TestSpec testSpec;
+
+	@Rule
+	public ExpectedException thrown = ExpectedException.none();
+
+	@Test
+	public void testParsing() {
+		if (testSpec.expectedType != null) {
+			assertThat(
+				LogicalTypeParser.parse(testSpec.typeString),
+				equalTo(testSpec.expectedType));
+		}
+	}
+
+	@Test
+	public void testSerializableParsing() {
+		if (testSpec.expectedType != null) {
+			if (!hasRoot(testSpec.expectedType, UNRESOLVED) &&
+					testSpec.expectedType.getChildren().stream().noneMatch(t -> hasRoot(t, UNRESOLVED))) {
+				assertThat(
+					LogicalTypeParser.parse(testSpec.expectedType.asSerializableString()),
+					equalTo(testSpec.expectedType));
+			}
+		}
+	}
+
+	@Test
+	public void testErrorMessage() {
+		if (testSpec.expectedErrorMessage != null) {
+			thrown.expect(ValidationException.class);
+			thrown.expectMessage(testSpec.expectedErrorMessage);
+
+			LogicalTypeParser.parse(testSpec.typeString);
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private static class TestSpec {
+
+		private final String typeString;
+
+		private @Nullable LogicalType expectedType;
+
+		private @Nullable String expectedErrorMessage;
+
+		private TestSpec(String typeString) {
+			this.typeString = typeString;
+		}
+
+		static TestSpec forString(String typeString) {
+			return new TestSpec(typeString);
+		}
+
+		TestSpec expectType(LogicalType expectedType) {
+			this.expectedType = expectedType;
+			return this;
+		}
+
+		TestSpec expectErrorMessage(String expectedErrorMessage) {
+			this.expectedErrorMessage = expectedErrorMessage;
+			return this;
+		}
+	}
+
+	private static <T> AnyType<T> createAnyType(Class<T> clazz) {
+		return new AnyType<>(clazz, new KryoSerializer<>(clazz, new ExecutionConfig()));
+	}
+}