You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/07/18 15:32:04 UTC

[flink] 06/06: [FLINK-13078][table-common] Add a logical type parser

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 479fb070edc4a681d85e4c20a964083751aa3720
Author: Timo Walther <tw...@apache.org>
AuthorDate: Wed Jul 10 08:52:30 2019 +0200

    [FLINK-13078][table-common] Add a logical type parser
    
    This adds a parser for all logical types defined in FLIP-37.
    
    This closes #9061.
---
 .../flink/table/types/logical/LogicalType.java     |   3 +
 .../types/logical/utils/LogicalTypeParser.java     | 900 +++++++++++++++++++++
 .../apache/flink/table/utils/EncodingUtils.java    |   6 +-
 .../apache/flink/table/utils/TypeStringUtils.java  |  12 +-
 .../flink/table/types/LogicalTypeParserTest.java   | 519 ++++++++++++
 5 files changed, 1437 insertions(+), 3 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java
index 4e4942a..cc46533 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.types.logical;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.types.logical.utils.LogicalTypeCasts;
 import org.apache.flink.table.types.logical.utils.LogicalTypeGeneralization;
+import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
 import org.apache.flink.util.Preconditions;
 
 import java.io.Serializable;
@@ -98,6 +99,8 @@ public abstract class LogicalType implements Serializable {
 	 * Returns a string that fully serializes this instance. The serialized string can be used for
 	 * transmitting or persisting a type.
 	 *
+	 * <p>See {@link LogicalTypeParser} for the reverse operation.
+	 *
 	 * @return detailed string for transmission or persistence
 	 */
 	public abstract String asSerializableString();
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeParser.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeParser.java
new file mode 100644
index 0000000..b6fcd07
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeParser.java
@@ -0,0 +1,900 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical.utils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.AnyType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.UnresolvedUserDefinedType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType.YearMonthResolution;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+import org.apache.flink.table.utils.EncodingUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Parser for creating instances of {@link LogicalType} from a serialized string created with
+ * {@link LogicalType#asSerializableString()}.
+ *
+ * <p>In addition to the serializable string representations, this parser also supports common
+ * shortcuts for certain types. This includes:
+ * <ul>
+ *     <li>{@code STRING} as a synonym for {@code VARCHAR(INT_MAX)}</li>
+ *     <li>{@code BYTES} as a synonym for {@code VARBINARY(INT_MAX)}</li>
+ *     <li>{@code NUMERIC} and {@code DEC} as synonyms for {@code DECIMAL}</li>
+ *     <li>{@code INTEGER} as a synonym for {@code INT}</li>
+ *     <li>{@code DOUBLE PRECISION} as a synonym for {@code DOUBLE}</li>
+ *     <li>{@code TIME WITHOUT TIME ZONE} as a synonym for {@code TIME}</li>
+ *     <li>{@code TIMESTAMP WITHOUT TIME ZONE} as a synonym for {@code TIMESTAMP}</li>
+ *     <li>{@code type ARRAY} as a synonym for {@code ARRAY<type>}</li>
+ *     <li>{@code type MULTISET} as a synonym for {@code MULTISET<type>}</li>
+ *     <li>{@code ROW(...)} as a synonym for {@code ROW<...>}</li>
+ *     <li>{@code type NULL} as a synonym for {@code type}</li>
+ * </ul>
+ *
+ * <p>Furthermore, it returns {@link UnresolvedUserDefinedType} for unknown types (partially or fully
+ * qualified such as {@code [catalog].[database].[type]}).
+ */
+@PublicEvolving
+public final class LogicalTypeParser {
+
+	/**
+	 * Parses a type string. All types will be fully resolved except for {@link UnresolvedUserDefinedType}s.
+	 *
+	 * @param typeString a string like "ROW(field1 INT, field2 BOOLEAN)"
+	 * @param classLoader class loader for loading classes of the ANY type
+	 * @throws ValidationException in case of parsing errors.
+	 */
+	public static LogicalType parse(String typeString, ClassLoader classLoader) {
+		final List<Token> tokens = tokenize(typeString);
+		final TokenParser converter = new TokenParser(typeString, tokens, classLoader);
+		return converter.parseTokens();
+	}
+
+	/**
+	 * Parses a type string. All types will be fully resolved except for {@link UnresolvedUserDefinedType}s.
+	 *
+	 * @param typeString a string like "ROW(field1 INT, field2 BOOLEAN)"
+	 * @throws ValidationException in case of parsing errors.
+	 */
+	public static LogicalType parse(String typeString) {
+		return parse(typeString, Thread.currentThread().getContextClassLoader());
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Tokenizer
+	// --------------------------------------------------------------------------------------------
+
+	private static final char CHAR_BEGIN_SUBTYPE = '<';
+	private static final char CHAR_END_SUBTYPE = '>';
+	private static final char CHAR_BEGIN_PARAMETER = '(';
+	private static final char CHAR_END_PARAMETER = ')';
+	private static final char CHAR_LIST_SEPARATOR = ',';
+	private static final char CHAR_STRING = '\'';
+	private static final char CHAR_IDENTIFIER = '`';
+	private static final char CHAR_DOT = '.';
+
+	private static boolean isDelimiter(char character) {
+		return Character.isWhitespace(character) ||
+			character == CHAR_BEGIN_SUBTYPE ||
+			character == CHAR_END_SUBTYPE ||
+			character == CHAR_BEGIN_PARAMETER ||
+			character == CHAR_END_PARAMETER ||
+			character == CHAR_LIST_SEPARATOR ||
+			character == CHAR_DOT;
+	}
+
+	private static boolean isDigit(char c) {
+		return c >= '0' && c <= '9';
+	}
+
+	private static List<Token> tokenize(String chars) {
+		final List<Token> tokens = new ArrayList<>();
+		final StringBuilder builder = new StringBuilder();
+		for (int cursor = 0; cursor < chars.length(); cursor++) {
+			char curChar = chars.charAt(cursor);
+			switch (curChar) {
+				case CHAR_BEGIN_SUBTYPE:
+					tokens.add(new Token(TokenType.BEGIN_SUBTYPE, cursor, Character.toString(CHAR_BEGIN_SUBTYPE)));
+					break;
+				case CHAR_END_SUBTYPE:
+					tokens.add(new Token(TokenType.END_SUBTYPE, cursor, Character.toString(CHAR_END_SUBTYPE)));
+					break;
+				case CHAR_BEGIN_PARAMETER:
+					tokens.add(new Token(TokenType.BEGIN_PARAMETER, cursor, Character.toString(CHAR_BEGIN_PARAMETER)));
+					break;
+				case CHAR_END_PARAMETER:
+					tokens.add(new Token(TokenType.END_PARAMETER, cursor, Character.toString(CHAR_END_PARAMETER)));
+					break;
+				case CHAR_LIST_SEPARATOR:
+					tokens.add(new Token(TokenType.LIST_SEPARATOR, cursor, Character.toString(CHAR_LIST_SEPARATOR)));
+					break;
+				case CHAR_DOT:
+					tokens.add(new Token(TokenType.IDENTIFIER_SEPARATOR, cursor, Character.toString(CHAR_DOT)));
+					break;
+				case CHAR_STRING:
+					builder.setLength(0);
+					cursor = consumeEscaped(builder, chars, cursor, CHAR_STRING);
+					tokens.add(new Token(TokenType.LITERAL_STRING, cursor, builder.toString()));
+					break;
+				case CHAR_IDENTIFIER:
+					builder.setLength(0);
+					cursor = consumeEscaped(builder, chars, cursor, CHAR_IDENTIFIER);
+					tokens.add(new Token(TokenType.IDENTIFIER, cursor, builder.toString()));
+					break;
+				default:
+					if (Character.isWhitespace(curChar)) {
+						continue;
+					}
+					if (isDigit(curChar)) {
+						builder.setLength(0);
+						cursor = consumeInt(builder, chars, cursor);
+						tokens.add(new Token(TokenType.LITERAL_INT, cursor, builder.toString()));
+						break;
+					}
+					builder.setLength(0);
+					cursor = consumeIdentifier(builder, chars, cursor);
+					final String token = builder.toString();
+					final String normalizedToken = token.toUpperCase();
+					if (KEYWORDS.contains(normalizedToken)) {
+						tokens.add(new Token(TokenType.KEYWORD, cursor, normalizedToken));
+					} else {
+						tokens.add(new Token(TokenType.IDENTIFIER, cursor, token));
+					}
+			}
+		}
+
+		return tokens;
+	}
+
+	private static int consumeEscaped(StringBuilder builder, String chars, int cursor, char delimiter) {
+		// skip delimiter
+		cursor++;
+		for (; chars.length() > cursor; cursor++) {
+			final char curChar = chars.charAt(cursor);
+			if (curChar == delimiter && cursor + 1 < chars.length() && chars.charAt(cursor + 1) == delimiter) {
+				// escaping of the escaping char e.g. "'Hello '' World'"
+				cursor++;
+				builder.append(curChar);
+			} else if (curChar == delimiter) {
+				break;
+			} else {
+				builder.append(curChar);
+			}
+		}
+		return cursor;
+	}
+
+	private static int consumeInt(StringBuilder builder, String chars, int cursor) {
+		for (; chars.length() > cursor && isDigit(chars.charAt(cursor)); cursor++) {
+			builder.append(chars.charAt(cursor));
+		}
+		return cursor - 1;
+	}
+
+	private static int consumeIdentifier(StringBuilder builder, String chars, int cursor) {
+		for (; cursor < chars.length() && !isDelimiter(chars.charAt(cursor)); cursor++) {
+			builder.append(chars.charAt(cursor));
+		}
+		return cursor - 1;
+	}
+
+	private enum TokenType {
+		// e.g. "ROW<"
+		BEGIN_SUBTYPE,
+
+		// e.g. "ROW<..>"
+		END_SUBTYPE,
+
+		// e.g. "CHAR("
+		BEGIN_PARAMETER,
+
+		// e.g. "CHAR(...)"
+		END_PARAMETER,
+
+		// e.g. "ROW<INT,"
+		LIST_SEPARATOR,
+
+		// e.g. "ROW<name INT 'Comment'"
+		LITERAL_STRING,
+
+		// CHAR(12
+		LITERAL_INT,
+
+		// e.g. "CHAR" or "TO"
+		KEYWORD,
+
+		// e.g. "ROW<name" or "myCatalog.myDatabase"
+		IDENTIFIER,
+
+		// e.g. "myCatalog.myDatabase."
+		IDENTIFIER_SEPARATOR
+	}
+
+	private enum Keyword {
+		CHAR,
+		VARCHAR,
+		STRING,
+		BOOLEAN,
+		BINARY,
+		VARBINARY,
+		BYTES,
+		DECIMAL,
+		NUMERIC,
+		DEC,
+		TINYINT,
+		SMALLINT,
+		INT,
+		INTEGER,
+		BIGINT,
+		FLOAT,
+		DOUBLE,
+		PRECISION,
+		DATE,
+		TIME,
+		WITH,
+		WITHOUT,
+		LOCAL,
+		ZONE,
+		TIMESTAMP,
+		INTERVAL,
+		YEAR,
+		MONTH,
+		DAY,
+		HOUR,
+		MINUTE,
+		SECOND,
+		TO,
+		ARRAY,
+		MULTISET,
+		MAP,
+		ROW,
+		NULL,
+		ANY,
+		NOT
+	}
+
+	private static final Set<String> KEYWORDS = Stream.of(Keyword.values())
+		.map(k -> k.toString().toUpperCase())
+		.collect(Collectors.toSet());
+
+	private static class Token {
+		public final TokenType type;
+		public final int cursorPosition;
+		public final String value;
+
+		public Token(TokenType type, int cursorPosition, String value) {
+			this.type = type;
+			this.cursorPosition = cursorPosition;
+			this.value = value;
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Token Parsing
+	// --------------------------------------------------------------------------------------------
+
+	private static class TokenParser {
+
+		private final String inputString;
+
+		private final List<Token> tokens;
+
+		private final ClassLoader classLoader;
+
+		private int lastValidToken;
+
+		private int currentToken;
+
+		public TokenParser(String inputString, List<Token> tokens, ClassLoader classLoader) {
+			this.inputString = inputString;
+			this.tokens = tokens;
+			this.classLoader = classLoader;
+			this.lastValidToken = -1;
+			this.currentToken = -1;
+		}
+
+		private LogicalType parseTokens() {
+			final LogicalType type = parseTypeWithNullability();
+			if (hasRemainingTokens()) {
+				nextToken();
+				throw parsingError("Unexpected token: " + token().value);
+			}
+			return type;
+		}
+
+		private int lastCursor() {
+			if (lastValidToken < 0) {
+				return 0;
+			}
+			return tokens.get(lastValidToken).cursorPosition + 1;
+		}
+
+		private ValidationException parsingError(String cause, @Nullable Throwable t) {
+			return new ValidationException(
+				String.format(
+					"Could not parse type at position %d: %s\n Input type string: %s",
+					lastCursor(),
+					cause,
+					inputString),
+				t);
+		}
+
+		private ValidationException parsingError(String cause) {
+			return parsingError(cause, null);
+		}
+
+		private boolean hasRemainingTokens() {
+			return currentToken + 1 < tokens.size();
+		}
+
+		private Token token() {
+			return tokens.get(currentToken);
+		}
+
+		private int tokenAsInt() {
+			try {
+				return Integer.valueOf(token().value);
+			} catch (NumberFormatException e) {
+				throw parsingError("Invalid integer value.", e);
+			}
+		}
+
+		private Keyword tokenAsKeyword() {
+			return Keyword.valueOf(token().value);
+		}
+
+		private String tokenAsString() {
+			return token().value;
+		}
+
+		private void nextToken() {
+			this.currentToken++;
+			if (currentToken >= tokens.size()) {
+				throw parsingError("Unexpected end.");
+			}
+			lastValidToken = this.currentToken - 1;
+		}
+
+		private void nextToken(TokenType type) {
+			nextToken();
+			final Token token = token();
+			if (token.type != type) {
+				throw parsingError("<" + type.name() + "> expected but was <" + token.type + ">.");
+			}
+		}
+
+		private void nextToken(Keyword keyword) {
+			nextToken(TokenType.KEYWORD);
+			final Token token = token();
+			if (!keyword.equals(Keyword.valueOf(token.value))) {
+				throw parsingError("Keyword '" + keyword + "' expected but was '" + token.value + "'.");
+			}
+		}
+
+		private boolean hasNextToken(TokenType... types) {
+			if (currentToken + types.length + 1 > tokens.size()) {
+				return false;
+			}
+			for (int i = 0; i < types.length; i++) {
+				final Token lookAhead = tokens.get(currentToken + i + 1);
+				if (lookAhead.type != types[i]) {
+					return false;
+				}
+			}
+			return true;
+		}
+
+		private boolean hasNextToken(Keyword... keywords) {
+			if (currentToken + keywords.length + 1 > tokens.size()) {
+				return false;
+			}
+			for (int i = 0; i < keywords.length; i++) {
+				final Token lookAhead = tokens.get(currentToken + i + 1);
+				if (lookAhead.type != TokenType.KEYWORD ||
+						keywords[i] != Keyword.valueOf(lookAhead.value)) {
+					return false;
+				}
+			}
+			return true;
+		}
+
+		private boolean parseNullability() {
+			// "NOT NULL"
+			if (hasNextToken(Keyword.NOT, Keyword.NULL)) {
+				nextToken(Keyword.NOT);
+				nextToken(Keyword.NULL);
+				return false;
+			}
+			// explicit "NULL"
+			else if (hasNextToken(Keyword.NULL)) {
+				nextToken(Keyword.NULL);
+				return true;
+			}
+			// implicit "NULL"
+			return true;
+		}
+
+		private LogicalType parseTypeWithNullability() {
+			final LogicalType logicalType;
+			if (hasNextToken(TokenType.IDENTIFIER)) {
+				logicalType = parseTypeByIdentifier().copy(parseNullability());
+			} else {
+				logicalType = parseTypeByKeyword().copy(parseNullability());
+			}
+
+			// special case: suffix notation for ARRAY and MULTISET types
+			if (hasNextToken(Keyword.ARRAY)) {
+				nextToken(Keyword.ARRAY);
+				return new ArrayType(logicalType).copy(parseNullability());
+			} else if (hasNextToken(Keyword.MULTISET)) {
+				nextToken(Keyword.MULTISET);
+				return new MultisetType(logicalType).copy(parseNullability());
+			}
+
+			return logicalType;
+		}
+
+		private LogicalType parseTypeByKeyword() {
+			nextToken(TokenType.KEYWORD);
+			switch (tokenAsKeyword()) {
+				case CHAR:
+					return parseCharType();
+				case VARCHAR:
+					return parseVarCharType();
+				case STRING:
+					return new VarCharType(VarCharType.MAX_LENGTH);
+				case BOOLEAN:
+					return new BooleanType();
+				case BINARY:
+					return parseBinaryType();
+				case VARBINARY:
+					return parseVarBinaryType();
+				case BYTES:
+					return new VarBinaryType(VarBinaryType.MAX_LENGTH);
+				case DECIMAL:
+				case NUMERIC:
+				case DEC:
+					return parseDecimalType();
+				case TINYINT:
+					return new TinyIntType();
+				case SMALLINT:
+					return new SmallIntType();
+				case INT:
+				case INTEGER:
+					return new IntType();
+				case BIGINT:
+					return new BigIntType();
+				case FLOAT:
+					return new FloatType();
+				case DOUBLE:
+					return parseDoubleType();
+				case DATE:
+					return new DateType();
+				case TIME:
+					return parseTimeType();
+				case TIMESTAMP:
+					return parseTimestampType();
+				case INTERVAL:
+					return parseIntervalType();
+				case ARRAY:
+					return parseArrayType();
+				case MULTISET:
+					return parseMultisetType();
+				case MAP:
+					return parseMapType();
+				case ROW:
+					return parseRowType();
+				case NULL:
+					return new NullType();
+				case ANY:
+					return parseAnyType();
+				default:
+					throw parsingError("Unsupported type: " + token().value);
+			}
+		}
+
+		private LogicalType parseTypeByIdentifier() {
+			nextToken(TokenType.IDENTIFIER);
+			List<String> parts = new ArrayList<>();
+			parts.add(tokenAsString());
+			if (hasNextToken(TokenType.IDENTIFIER_SEPARATOR)) {
+				nextToken(TokenType.IDENTIFIER_SEPARATOR);
+				nextToken(TokenType.IDENTIFIER);
+				parts.add(tokenAsString());
+			}
+			if (hasNextToken(TokenType.IDENTIFIER_SEPARATOR)) {
+				nextToken(TokenType.IDENTIFIER_SEPARATOR);
+				nextToken(TokenType.IDENTIFIER);
+				parts.add(tokenAsString());
+			}
+			return new UnresolvedUserDefinedType(
+				lastPart(parts, 2),
+				lastPart(parts, 1),
+				lastPart(parts, 0));
+		}
+
+		private @Nullable String lastPart(List<String> parts, int inversePos) {
+			final int pos = parts.size() - inversePos - 1;
+			if (pos < 0) {
+				return null;
+			}
+			return parts.get(pos);
+		}
+
+		private int parseStringType() {
+			// explicit length
+			if (hasNextToken(TokenType.BEGIN_PARAMETER)) {
+				nextToken(TokenType.BEGIN_PARAMETER);
+				nextToken(TokenType.LITERAL_INT);
+				final int length = tokenAsInt();
+				nextToken(TokenType.END_PARAMETER);
+				return length;
+			}
+			// implicit length
+			return -1;
+		}
+
+		private LogicalType parseCharType() {
+			final int length = parseStringType();
+			if (length < 0) {
+				return new CharType();
+			} else {
+				return new CharType(length);
+			}
+		}
+
+		private LogicalType parseVarCharType() {
+			final int length = parseStringType();
+			if (length < 0) {
+				return new VarCharType();
+			} else {
+				return new VarCharType(length);
+			}
+		}
+
+		private LogicalType parseBinaryType() {
+			final int length = parseStringType();
+			if (length < 0) {
+				return new BinaryType();
+			} else {
+				return new BinaryType(length);
+			}
+		}
+
+		private LogicalType parseVarBinaryType() {
+			final int length = parseStringType();
+			if (length < 0) {
+				return new VarBinaryType();
+			} else {
+				return new VarBinaryType(length);
+			}
+		}
+
+		private LogicalType parseDecimalType() {
+			int precision = DecimalType.DEFAULT_PRECISION;
+			int scale = DecimalType.DEFAULT_SCALE;
+			if (hasNextToken(TokenType.BEGIN_PARAMETER)) {
+				nextToken(TokenType.BEGIN_PARAMETER);
+				nextToken(TokenType.LITERAL_INT);
+				precision = tokenAsInt();
+				if (hasNextToken(TokenType.LIST_SEPARATOR)) {
+					nextToken(TokenType.LIST_SEPARATOR);
+					nextToken(TokenType.LITERAL_INT);
+					scale = tokenAsInt();
+				}
+				nextToken(TokenType.END_PARAMETER);
+			}
+			return new DecimalType(precision, scale);
+		}
+
+		private LogicalType parseDoubleType() {
+			if (hasNextToken(Keyword.PRECISION)) {
+				nextToken(Keyword.PRECISION);
+			}
+			return new DoubleType();
+		}
+
+		private LogicalType parseTimeType() {
+			int precision = parseOptionalPrecision(TimeType.DEFAULT_PRECISION);
+			if (hasNextToken(Keyword.WITHOUT)) {
+				nextToken(Keyword.WITHOUT);
+				nextToken(Keyword.TIME);
+				nextToken(Keyword.ZONE);
+			}
+			return new TimeType(precision);
+		}
+
+		private LogicalType parseTimestampType() {
+			int precision = parseOptionalPrecision(TimestampType.DEFAULT_PRECISION);
+			if (hasNextToken(Keyword.WITHOUT)) {
+				nextToken(Keyword.WITHOUT);
+				nextToken(Keyword.TIME);
+				nextToken(Keyword.ZONE);
+			} else if (hasNextToken(Keyword.WITH)) {
+				nextToken(Keyword.WITH);
+				if (hasNextToken(Keyword.LOCAL)) {
+					nextToken(Keyword.LOCAL);
+					nextToken(Keyword.TIME);
+					nextToken(Keyword.ZONE);
+					return new LocalZonedTimestampType(precision);
+				} else {
+					nextToken(Keyword.TIME);
+					nextToken(Keyword.ZONE);
+					return new ZonedTimestampType(precision);
+				}
+			}
+			return new TimestampType(precision);
+		}
+
+		private LogicalType parseIntervalType() {
+			nextToken(TokenType.KEYWORD);
+			switch (tokenAsKeyword()) {
+				case YEAR:
+				case MONTH:
+					return parseYearMonthIntervalType();
+				case DAY:
+				case HOUR:
+				case MINUTE:
+				case SECOND:
+					return parseDayTimeIntervalType();
+				default:
+					throw parsingError("Invalid interval resolution.");
+			}
+		}
+
+		private LogicalType parseYearMonthIntervalType() {
+			int yearPrecision = YearMonthIntervalType.DEFAULT_PRECISION;
+			switch (tokenAsKeyword()) {
+				case YEAR:
+					yearPrecision = parseOptionalPrecision(yearPrecision);
+					if (hasNextToken(Keyword.TO)) {
+						nextToken(Keyword.TO);
+						nextToken(Keyword.MONTH);
+						return new YearMonthIntervalType(
+							YearMonthResolution.YEAR_TO_MONTH,
+							yearPrecision);
+					}
+					return new YearMonthIntervalType(
+						YearMonthResolution.YEAR,
+						yearPrecision);
+				case MONTH:
+					return new YearMonthIntervalType(
+						YearMonthResolution.MONTH,
+						yearPrecision);
+				default:
+					throw parsingError("Invalid year-month interval resolution.");
+			}
+		}
+
+		private LogicalType parseDayTimeIntervalType() {
+			int dayPrecision = DayTimeIntervalType.DEFAULT_DAY_PRECISION;
+			int fractionalPrecision = DayTimeIntervalType.DEFAULT_FRACTIONAL_PRECISION;
+			switch (tokenAsKeyword()) {
+				case DAY:
+					dayPrecision = parseOptionalPrecision(dayPrecision);
+					if (hasNextToken(Keyword.TO)) {
+						nextToken(Keyword.TO);
+						nextToken(TokenType.KEYWORD);
+						switch (tokenAsKeyword()) {
+							case HOUR:
+								return new DayTimeIntervalType(
+									DayTimeResolution.DAY_TO_HOUR,
+									dayPrecision,
+									fractionalPrecision);
+							case MINUTE:
+								return new DayTimeIntervalType(
+									DayTimeResolution.DAY_TO_MINUTE,
+									dayPrecision,
+									fractionalPrecision);
+							case SECOND:
+								fractionalPrecision = parseOptionalPrecision(fractionalPrecision);
+								return new DayTimeIntervalType(
+									DayTimeResolution.DAY_TO_SECOND,
+									dayPrecision,
+									fractionalPrecision);
+							default:
+								throw parsingError("Invalid day-time interval resolution.");
+						}
+					}
+					return new DayTimeIntervalType(
+						DayTimeResolution.DAY,
+						dayPrecision,
+						fractionalPrecision);
+				case HOUR:
+					if (hasNextToken(Keyword.TO)) {
+						nextToken(Keyword.TO);
+						nextToken(TokenType.KEYWORD);
+						switch (tokenAsKeyword()) {
+							case MINUTE:
+								return new DayTimeIntervalType(
+									DayTimeResolution.HOUR_TO_MINUTE,
+									dayPrecision,
+									fractionalPrecision);
+							case SECOND:
+								fractionalPrecision = parseOptionalPrecision(fractionalPrecision);
+								return new DayTimeIntervalType(
+									DayTimeResolution.HOUR_TO_SECOND,
+									dayPrecision,
+									fractionalPrecision);
+							default:
+								throw parsingError("Invalid day-time interval resolution.");
+						}
+					}
+					return new DayTimeIntervalType(
+						DayTimeResolution.HOUR,
+						dayPrecision,
+						fractionalPrecision);
+				case MINUTE:
+					if (hasNextToken(Keyword.TO)) {
+						nextToken(Keyword.TO);
+						nextToken(Keyword.SECOND);
+						fractionalPrecision = parseOptionalPrecision(fractionalPrecision);
+						return new DayTimeIntervalType(
+							DayTimeResolution.MINUTE_TO_SECOND,
+							dayPrecision,
+							fractionalPrecision);
+					}
+					return new DayTimeIntervalType(
+						DayTimeResolution.MINUTE,
+						dayPrecision,
+						fractionalPrecision);
+				case SECOND:
+					fractionalPrecision = parseOptionalPrecision(fractionalPrecision);
+					return new DayTimeIntervalType(
+						DayTimeResolution.SECOND,
+						dayPrecision,
+						fractionalPrecision);
+				default:
+					throw parsingError("Invalid day-time interval resolution.");
+			}
+		}
+
+		private int parseOptionalPrecision(int defaultPrecision) {
+			int precision = defaultPrecision;
+			if (hasNextToken(TokenType.BEGIN_PARAMETER)) {
+				nextToken(TokenType.BEGIN_PARAMETER);
+				nextToken(TokenType.LITERAL_INT);
+				precision = tokenAsInt();
+				nextToken(TokenType.END_PARAMETER);
+			}
+			return precision;
+		}
+
+		private LogicalType parseArrayType() {
+			nextToken(TokenType.BEGIN_SUBTYPE);
+			final LogicalType elementType = parseTypeWithNullability();
+			nextToken(TokenType.END_SUBTYPE);
+			return new ArrayType(elementType);
+		}
+
+		private LogicalType parseMultisetType() {
+			nextToken(TokenType.BEGIN_SUBTYPE);
+			final LogicalType elementType = parseTypeWithNullability();
+			nextToken(TokenType.END_SUBTYPE);
+			return new MultisetType(elementType);
+		}
+
+		private LogicalType parseMapType() {
+			nextToken(TokenType.BEGIN_SUBTYPE);
+			final LogicalType keyType = parseTypeWithNullability();
+			nextToken(TokenType.LIST_SEPARATOR);
+			final LogicalType valueType = parseTypeWithNullability();
+			nextToken(TokenType.END_SUBTYPE);
+			return new MapType(keyType, valueType);
+		}
+
+		private LogicalType parseRowType() {
+			List<RowType.RowField> fields;
+			// SQL standard notation
+			if (hasNextToken(TokenType.BEGIN_PARAMETER)) {
+				nextToken(TokenType.BEGIN_PARAMETER);
+				fields = parseRowFields(TokenType.END_PARAMETER);
+				nextToken(TokenType.END_PARAMETER);
+			} else {
+				nextToken(TokenType.BEGIN_SUBTYPE);
+				fields = parseRowFields(TokenType.END_SUBTYPE);
+				nextToken(TokenType.END_SUBTYPE);
+			}
+			return new RowType(fields);
+		}
+
+		private List<RowType.RowField> parseRowFields(TokenType endToken) {
+			List<RowType.RowField> fields = new ArrayList<>();
+			boolean isFirst = true;
+			while (!hasNextToken(endToken)) {
+				if (isFirst) {
+					isFirst = false;
+				} else {
+					nextToken(TokenType.LIST_SEPARATOR);
+				}
+				nextToken(TokenType.IDENTIFIER);
+				final String name = tokenAsString();
+				final LogicalType type = parseTypeWithNullability();
+				if (hasNextToken(TokenType.LITERAL_STRING)) {
+					nextToken(TokenType.LITERAL_STRING);
+					final String description = tokenAsString();
+					fields.add(new RowType.RowField(name, type, description));
+				} else {
+					fields.add(new RowType.RowField(name, type));
+				}
+			}
+			return fields;
+		}
+
+		@SuppressWarnings("unchecked")
+		private LogicalType parseAnyType() {
+			nextToken(TokenType.BEGIN_PARAMETER);
+			nextToken(TokenType.LITERAL_STRING);
+			final String className = tokenAsString();
+
+			nextToken(TokenType.LIST_SEPARATOR);
+			nextToken(TokenType.LITERAL_STRING);
+			final String serializer = tokenAsString();
+			nextToken(TokenType.END_PARAMETER);
+
+			try {
+				final Class<?> clazz = Class.forName(className, true, classLoader);
+				final byte[] bytes = EncodingUtils.decodeBase64ToBytes(serializer);
+				final DataInputDeserializer inputDeserializer = new DataInputDeserializer(bytes);
+				final TypeSerializerSnapshot<?> snapshot = TypeSerializerSnapshot.readVersionedSnapshot(
+					inputDeserializer,
+					classLoader);
+				return new AnyType(clazz, snapshot.restoreSerializer());
+			} catch (Throwable t) {
+				throw parsingError(
+					"Unable to restore the ANY type of class '" + className + "' with " +
+						"serializer snapshot '" + serializer + "'.", t);
+			}
+		}
+	}
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java
index 95d10e3..ca88427 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java
@@ -110,12 +110,16 @@ public abstract class EncodingUtils {
 		return new String(java.util.Base64.getEncoder().encode(bytes), UTF_8);
 	}
 
+	public static byte[] decodeBase64ToBytes(String base64) {
+		return java.util.Base64.getDecoder().decode(base64.getBytes(UTF_8));
+	}
+
 	public static String encodeStringToBase64(String string) {
 		return encodeBytesToBase64(string.getBytes(UTF_8));
 	}
 
 	public static String decodeBase64ToString(String base64) {
-		return new String(java.util.Base64.getDecoder().decode(base64.getBytes(UTF_8)), UTF_8);
+		return new String(decodeBase64ToBytes(base64), UTF_8);
 	}
 
 	public static byte[] md5(String string) {
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TypeStringUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TypeStringUtils.java
index faf26d0..6bf414a 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TypeStringUtils.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TypeStringUtils.java
@@ -32,13 +32,21 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
 
 import java.util.ArrayList;
 import java.util.List;
 
 /**
-  * Utilities to convert {@link TypeInformation} into a string representation and back.
-  */
+ * Utilities to convert {@link TypeInformation} into a string representation and back.
+ *
+ * @deprecated This utility is based on {@link TypeInformation}. However, the Table & SQL API is
+ *             currently updated to use {@link DataType}s based on {@link LogicalType}s. Use
+ *             {@link LogicalTypeParser} instead.
+ */
+@Deprecated
 @PublicEvolving
 public class TypeStringUtils {
 
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeParserTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeParserTest.java
new file mode 100644
index 0000000..df55424
--- /dev/null
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeParserTest.java
@@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.AnyType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.UnresolvedUserDefinedType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType.YearMonthResolution;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.UNRESOLVED;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link LogicalTypeParser}.
+ */
+@RunWith(Parameterized.class)
+public class LogicalTypeParserTest {
+
+	@Parameters(name = "{index}: [From: {0}, To: {1}]")
+	public static List<TestSpec> testData() {
+		return Arrays.asList(
+
+			TestSpec
+				.forString("CHAR")
+				.expectType(new CharType()),
+
+			TestSpec
+				.forString("CHAR NOT NULL")
+				.expectType(new  CharType().copy(false)),
+
+			TestSpec
+				.forString("CHAR   NOT \t\nNULL")
+				.expectType(new  CharType().copy(false)),
+
+			TestSpec
+				.forString("char not null")
+				.expectType(new CharType().copy(false)),
+
+			TestSpec
+				.forString("CHAR NULL")
+				.expectType(new CharType()),
+
+			TestSpec
+				.forString("CHAR(33)")
+				.expectType(new CharType(33)),
+
+			TestSpec
+				.forString("VARCHAR")
+				.expectType(new VarCharType()),
+
+			TestSpec
+				.forString("VARCHAR(33)")
+				.expectType(new VarCharType(33)),
+
+			TestSpec
+				.forString("STRING")
+				.expectType(new VarCharType(VarCharType.MAX_LENGTH)),
+
+			TestSpec
+				.forString("BOOLEAN")
+				.expectType(new BooleanType()),
+
+			TestSpec
+				.forString("BINARY")
+				.expectType(new BinaryType()),
+
+			TestSpec
+				.forString("BINARY(33)")
+				.expectType(new BinaryType(33)),
+
+			TestSpec
+				.forString("VARBINARY")
+				.expectType(new VarBinaryType()),
+
+			TestSpec
+				.forString("VARBINARY(33)")
+				.expectType(new VarBinaryType(33)),
+
+			TestSpec
+				.forString("BYTES")
+				.expectType(new VarBinaryType(VarBinaryType.MAX_LENGTH)),
+
+			TestSpec
+				.forString("DECIMAL")
+				.expectType(new DecimalType()),
+
+			TestSpec
+				.forString("DEC")
+				.expectType(new DecimalType()),
+
+			TestSpec
+				.forString("NUMERIC")
+				.expectType(new DecimalType()),
+
+			TestSpec
+				.forString("DECIMAL(10)")
+				.expectType(new DecimalType(10)),
+
+			TestSpec
+				.forString("DEC(10)")
+				.expectType(new DecimalType(10)),
+
+			TestSpec
+				.forString("NUMERIC(10)")
+				.expectType(new DecimalType(10)),
+
+			TestSpec
+				.forString("DECIMAL(10, 3)")
+				.expectType(new DecimalType(10, 3)),
+
+			TestSpec
+				.forString("DEC(10, 3)")
+				.expectType(new DecimalType(10, 3)),
+
+			TestSpec
+				.forString("NUMERIC(10, 3)")
+				.expectType(new DecimalType(10, 3)),
+
+			TestSpec
+				.forString("TINYINT")
+				.expectType(new TinyIntType()),
+
+			TestSpec
+				.forString("SMALLINT")
+				.expectType(new SmallIntType()),
+
+			TestSpec
+				.forString("INTEGER")
+				.expectType(new IntType()),
+
+			TestSpec
+				.forString("INT")
+				.expectType(new IntType()),
+
+			TestSpec
+				.forString("BIGINT")
+				.expectType(new BigIntType()),
+
+			TestSpec
+				.forString("FLOAT")
+				.expectType(new FloatType()),
+
+			TestSpec
+				.forString("DOUBLE")
+				.expectType(new DoubleType()),
+
+			TestSpec
+				.forString("DOUBLE PRECISION")
+				.expectType(new DoubleType()),
+
+			TestSpec
+				.forString("DATE")
+				.expectType(new DateType()),
+
+			TestSpec
+				.forString("TIME")
+				.expectType(new TimeType()),
+
+			TestSpec
+				.forString("TIME(3)")
+				.expectType(new TimeType(3)),
+
+			TestSpec
+				.forString("TIME WITHOUT TIME ZONE")
+				.expectType(new TimeType()),
+
+			TestSpec
+				.forString("TIME(3) WITHOUT TIME ZONE")
+				.expectType(new TimeType(3)),
+
+			TestSpec
+				.forString("TIMESTAMP")
+				.expectType(new TimestampType()),
+
+			TestSpec
+				.forString("TIMESTAMP(3)")
+				.expectType(new TimestampType(3)),
+
+			TestSpec
+				.forString("TIMESTAMP WITHOUT TIME ZONE")
+				.expectType(new TimestampType()),
+
+			TestSpec
+				.forString("TIMESTAMP(3) WITHOUT TIME ZONE")
+				.expectType(new TimestampType(3)),
+
+			TestSpec
+				.forString("TIMESTAMP WITH TIME ZONE")
+				.expectType(new ZonedTimestampType()),
+
+			TestSpec
+				.forString("TIMESTAMP(3) WITH TIME ZONE")
+				.expectType(new ZonedTimestampType(3)),
+
+			TestSpec
+				.forString("TIMESTAMP WITH LOCAL TIME ZONE")
+				.expectType(new LocalZonedTimestampType()),
+
+			TestSpec
+				.forString("TIMESTAMP(3) WITH LOCAL TIME ZONE")
+				.expectType(new LocalZonedTimestampType(3)),
+
+			TestSpec
+				.forString("INTERVAL YEAR")
+				.expectType(new YearMonthIntervalType(YearMonthResolution.YEAR)),
+
+			TestSpec
+				.forString("INTERVAL YEAR(4)")
+				.expectType(new YearMonthIntervalType(YearMonthResolution.YEAR, 4)),
+
+			TestSpec
+				.forString("INTERVAL MONTH")
+				.expectType(new YearMonthIntervalType(YearMonthResolution.MONTH)),
+
+			TestSpec
+				.forString("INTERVAL YEAR TO MONTH")
+				.expectType(new YearMonthIntervalType(YearMonthResolution.YEAR_TO_MONTH)),
+
+			TestSpec
+				.forString("INTERVAL YEAR(4) TO MONTH")
+				.expectType(new YearMonthIntervalType(YearMonthResolution.YEAR_TO_MONTH, 4)),
+
+			TestSpec
+				.forString("INTERVAL DAY(2) TO SECOND(3)")
+				.expectType(new DayTimeIntervalType(DayTimeResolution.DAY_TO_SECOND, 2, 3)),
+
+			TestSpec
+				.forString("INTERVAL HOUR TO SECOND(3)")
+				.expectType(
+					new DayTimeIntervalType(
+						DayTimeResolution.HOUR_TO_SECOND,
+						DayTimeIntervalType.DEFAULT_DAY_PRECISION,
+						3)
+				),
+
+			TestSpec
+				.forString("INTERVAL MINUTE")
+				.expectType(new DayTimeIntervalType(DayTimeResolution.MINUTE)),
+
+			TestSpec
+				.forString("ARRAY<TIMESTAMP(3) WITH LOCAL TIME ZONE>")
+				.expectType(new ArrayType(new LocalZonedTimestampType(3))),
+
+			TestSpec
+				.forString("ARRAY<INT NOT NULL>")
+				.expectType(new ArrayType(new IntType(false))),
+
+			TestSpec
+				.forString("INT ARRAY")
+				.expectType(new ArrayType(new IntType())),
+
+			TestSpec
+				.forString("INT NOT NULL ARRAY")
+				.expectType(new ArrayType(new IntType(false))),
+
+			TestSpec
+				.forString("INT ARRAY NOT NULL")
+				.expectType(new ArrayType(false, new IntType())),
+
+			TestSpec
+				.forString("MULTISET<INT NOT NULL>")
+				.expectType(new MultisetType(new IntType(false))),
+
+			TestSpec
+				.forString("INT MULTISET")
+				.expectType(new MultisetType(new IntType())),
+
+			TestSpec
+				.forString("INT NOT NULL MULTISET")
+				.expectType(new MultisetType(new IntType(false))),
+
+			TestSpec
+				.forString("INT MULTISET NOT NULL")
+				.expectType(new MultisetType(false, new IntType())),
+
+			TestSpec
+				.forString("MAP<BIGINT, BOOLEAN>")
+				.expectType(new MapType(new BigIntType(), new BooleanType())),
+
+			TestSpec
+				.forString("ROW<f0 INT NOT NULL, f1 BOOLEAN>")
+				.expectType(
+					new RowType(
+						Arrays.asList(
+							new RowType.RowField("f0", new IntType(false)),
+							new RowType.RowField("f1", new BooleanType())))
+				),
+
+			TestSpec
+				.forString("ROW(f0 INT NOT NULL, f1 BOOLEAN)")
+				.expectType(
+					new RowType(
+						Arrays.asList(
+							new RowType.RowField("f0", new IntType(false)),
+							new RowType.RowField("f1", new BooleanType())))
+				),
+
+			TestSpec
+				.forString("ROW<`f0` INT>")
+				.expectType(
+					new RowType(
+						Collections.singletonList(new RowType.RowField("f0", new IntType())))
+				),
+
+			TestSpec
+				.forString("ROW(`f0` INT)")
+				.expectType(
+					new RowType(
+						Collections.singletonList(new RowType.RowField("f0", new IntType())))
+				),
+
+			TestSpec
+				.forString("ROW<>")
+				.expectType(new RowType(Collections.emptyList())),
+
+			TestSpec
+				.forString("ROW()")
+				.expectType(new RowType(Collections.emptyList())),
+
+			TestSpec
+				.forString("ROW<f0 INT NOT NULL 'This is a comment.', f1 BOOLEAN 'This as well.'>")
+				.expectType(
+					new RowType(
+						Arrays.asList(
+							new RowType.RowField("f0", new IntType(false), "This is a comment."),
+							new RowType.RowField("f1", new BooleanType(), "This as well.")))
+				),
+
+			TestSpec
+				.forString("NULL")
+				.expectType(new NullType()),
+
+			TestSpec
+				.forString(createAnyType(LogicalTypeParserTest.class).asSerializableString())
+				.expectType(createAnyType(LogicalTypeParserTest.class)),
+
+			TestSpec
+				.forString("cat.db.MyType")
+				.expectType(new UnresolvedUserDefinedType("cat", "db", "MyType")),
+
+			TestSpec
+				.forString("`db`.`MyType`")
+				.expectType(new UnresolvedUserDefinedType(null, "db", "MyType")),
+
+			TestSpec
+				.forString("MyType")
+				.expectType(new UnresolvedUserDefinedType(null, null, "MyType")),
+
+			TestSpec
+				.forString("ARRAY<MyType>")
+				.expectType(new ArrayType(new UnresolvedUserDefinedType(null, null, "MyType"))),
+
+			TestSpec
+				.forString("ROW<f0 MyType, f1 `c`.`d`.`t`>")
+				.expectType(
+					RowType.of(
+						new UnresolvedUserDefinedType(null, null, "MyType"),
+						new UnresolvedUserDefinedType("c", "d", "t"))
+				),
+
+			// error message testing
+
+			TestSpec
+				.forString("ROW<`f0")
+				.expectErrorMessage("Unexpected end"),
+
+			TestSpec
+				.forString("ROW<`f0`")
+				.expectErrorMessage("Unexpected end"),
+
+			TestSpec
+				.forString("VARCHAR(test)")
+				.expectErrorMessage("<LITERAL_INT> expected"),
+
+			TestSpec
+				.forString("VARCHAR(33333333333)")
+				.expectErrorMessage("Invalid integer value"),
+
+			TestSpec
+				.forString("ROW<field INT, field2>")
+				.expectErrorMessage("<KEYWORD> expected"),
+
+			TestSpec
+				.forString("ANY('unknown.class', '')")
+				.expectErrorMessage("Unable to restore the ANY type")
+		);
+	}
+
+	@Parameter
+	public TestSpec testSpec;
+
+	@Rule
+	public ExpectedException thrown = ExpectedException.none();
+
+	@Test
+	public void testParsing() {
+		if (testSpec.expectedType != null) {
+			assertThat(
+				LogicalTypeParser.parse(testSpec.typeString),
+				equalTo(testSpec.expectedType));
+		}
+	}
+
+	@Test
+	public void testSerializableParsing() {
+		if (testSpec.expectedType != null) {
+			if (!hasRoot(testSpec.expectedType, UNRESOLVED) &&
+					testSpec.expectedType.getChildren().stream().noneMatch(t -> hasRoot(t, UNRESOLVED))) {
+				assertThat(
+					LogicalTypeParser.parse(testSpec.expectedType.asSerializableString()),
+					equalTo(testSpec.expectedType));
+			}
+		}
+	}
+
+	@Test
+	public void testErrorMessage() {
+		if (testSpec.expectedErrorMessage != null) {
+			thrown.expect(ValidationException.class);
+			thrown.expectMessage(testSpec.expectedErrorMessage);
+
+			LogicalTypeParser.parse(testSpec.typeString);
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private static class TestSpec {
+
+		private final String typeString;
+
+		private @Nullable LogicalType expectedType;
+
+		private @Nullable String expectedErrorMessage;
+
+		private TestSpec(String typeString) {
+			this.typeString = typeString;
+		}
+
+		static TestSpec forString(String typeString) {
+			return new TestSpec(typeString);
+		}
+
+		TestSpec expectType(LogicalType expectedType) {
+			this.expectedType = expectedType;
+			return this;
+		}
+
+		TestSpec expectErrorMessage(String expectedErrorMessage) {
+			this.expectedErrorMessage = expectedErrorMessage;
+			return this;
+		}
+	}
+
+	private static <T> AnyType<T> createAnyType(Class<T> clazz) {
+		return new AnyType<>(clazz, new KryoSerializer<>(clazz, new ExecutionConfig()));
+	}
+}