You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/07/18 15:32:04 UTC
[flink] 06/06: [FLINK-13078][table-common] Add a logical type parser
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 479fb070edc4a681d85e4c20a964083751aa3720
Author: Timo Walther <tw...@apache.org>
AuthorDate: Wed Jul 10 08:52:30 2019 +0200
[FLINK-13078][table-common] Add a logical type parser
This adds a parser for all logical types defined in FLIP-37.
This closes #9061.
---
.../flink/table/types/logical/LogicalType.java | 3 +
.../types/logical/utils/LogicalTypeParser.java | 900 +++++++++++++++++++++
.../apache/flink/table/utils/EncodingUtils.java | 6 +-
.../apache/flink/table/utils/TypeStringUtils.java | 12 +-
.../flink/table/types/LogicalTypeParserTest.java | 519 ++++++++++++
5 files changed, 1437 insertions(+), 3 deletions(-)
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java
index 4e4942a..cc46533 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.types.logical;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.types.logical.utils.LogicalTypeCasts;
import org.apache.flink.table.types.logical.utils.LogicalTypeGeneralization;
+import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
import org.apache.flink.util.Preconditions;
import java.io.Serializable;
@@ -98,6 +99,8 @@ public abstract class LogicalType implements Serializable {
* Returns a string that fully serializes this instance. The serialized string can be used for
* transmitting or persisting a type.
*
+ * <p>See {@link LogicalTypeParser} for the reverse operation.
+ *
* @return detailed string for transmission or persistence
*/
public abstract String asSerializableString();
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeParser.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeParser.java
new file mode 100644
index 0000000..b6fcd07
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeParser.java
@@ -0,0 +1,900 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical.utils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.AnyType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.UnresolvedUserDefinedType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType.YearMonthResolution;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+import org.apache.flink.table.utils.EncodingUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Parser for creating instances of {@link LogicalType} from a serialized string created with
+ * {@link LogicalType#asSerializableString()}.
+ *
+ * <p>In addition to the serializable string representations, this parser also supports common
+ * shortcuts for certain types. This includes:
+ * <ul>
+ * <li>{@code STRING} as a synonym for {@code VARCHAR(INT_MAX)}</li>
+ * <li>{@code BYTES} as a synonym for {@code VARBINARY(INT_MAX)}</li>
+ * <li>{@code NUMERIC} and {@code DEC} as synonyms for {@code DECIMAL}</li>
+ * <li>{@code INTEGER} as a synonym for {@code INT}</li>
+ * <li>{@code DOUBLE PRECISION} as a synonym for {@code DOUBLE}</li>
+ * <li>{@code TIME WITHOUT TIME ZONE} as a synonym for {@code TIME}</li>
+ * <li>{@code TIMESTAMP WITHOUT TIME ZONE} as a synonym for {@code TIMESTAMP}</li>
+ * <li>{@code type ARRAY} as a synonym for {@code ARRAY<type>}</li>
+ * <li>{@code type MULTISET} as a synonym for {@code MULTISET<type>}</li>
+ * <li>{@code ROW(...)} as a synonym for {@code ROW<...>}</li>
+ * <li>{@code type NULL} as a synonym for {@code type}</li>
+ * </ul>
+ *
+ * <p>Furthermore, it returns {@link UnresolvedUserDefinedType} for unknown types (partially or fully
+ * qualified such as {@code [catalog].[database].[type]}).
+ */
+@PublicEvolving
+public final class LogicalTypeParser {
+
+ /**
+ * Parses a type string. All types will be fully resolved except for {@link UnresolvedUserDefinedType}s.
+ *
+ * @param typeString a string like "ROW(field1 INT, field2 BOOLEAN)"
+ * @param classLoader class loader for loading classes of the ANY type
+ * @throws ValidationException in case of parsing errors.
+ */
+ public static LogicalType parse(String typeString, ClassLoader classLoader) {
+ final List<Token> tokens = tokenize(typeString);
+ final TokenParser converter = new TokenParser(typeString, tokens, classLoader);
+ return converter.parseTokens();
+ }
+
+ /**
+ * Parses a type string. All types will be fully resolved except for {@link UnresolvedUserDefinedType}s.
+ *
+ * @param typeString a string like "ROW(field1 INT, field2 BOOLEAN)"
+ * @throws ValidationException in case of parsing errors.
+ */
+ public static LogicalType parse(String typeString) {
+ return parse(typeString, Thread.currentThread().getContextClassLoader());
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Tokenizer
+ // --------------------------------------------------------------------------------------------
+
+ private static final char CHAR_BEGIN_SUBTYPE = '<';
+ private static final char CHAR_END_SUBTYPE = '>';
+ private static final char CHAR_BEGIN_PARAMETER = '(';
+ private static final char CHAR_END_PARAMETER = ')';
+ private static final char CHAR_LIST_SEPARATOR = ',';
+ private static final char CHAR_STRING = '\'';
+ private static final char CHAR_IDENTIFIER = '`';
+ private static final char CHAR_DOT = '.';
+
+ private static boolean isDelimiter(char character) {
+ return Character.isWhitespace(character) ||
+ character == CHAR_BEGIN_SUBTYPE ||
+ character == CHAR_END_SUBTYPE ||
+ character == CHAR_BEGIN_PARAMETER ||
+ character == CHAR_END_PARAMETER ||
+ character == CHAR_LIST_SEPARATOR ||
+ character == CHAR_DOT;
+ }
+
+ private static boolean isDigit(char c) {
+ return c >= '0' && c <= '9';
+ }
+
+ private static List<Token> tokenize(String chars) {
+ final List<Token> tokens = new ArrayList<>();
+ final StringBuilder builder = new StringBuilder();
+ for (int cursor = 0; cursor < chars.length(); cursor++) {
+ char curChar = chars.charAt(cursor);
+ switch (curChar) {
+ case CHAR_BEGIN_SUBTYPE:
+ tokens.add(new Token(TokenType.BEGIN_SUBTYPE, cursor, Character.toString(CHAR_BEGIN_SUBTYPE)));
+ break;
+ case CHAR_END_SUBTYPE:
+ tokens.add(new Token(TokenType.END_SUBTYPE, cursor, Character.toString(CHAR_END_SUBTYPE)));
+ break;
+ case CHAR_BEGIN_PARAMETER:
+ tokens.add(new Token(TokenType.BEGIN_PARAMETER, cursor, Character.toString(CHAR_BEGIN_PARAMETER)));
+ break;
+ case CHAR_END_PARAMETER:
+ tokens.add(new Token(TokenType.END_PARAMETER, cursor, Character.toString(CHAR_END_PARAMETER)));
+ break;
+ case CHAR_LIST_SEPARATOR:
+ tokens.add(new Token(TokenType.LIST_SEPARATOR, cursor, Character.toString(CHAR_LIST_SEPARATOR)));
+ break;
+ case CHAR_DOT:
+ tokens.add(new Token(TokenType.IDENTIFIER_SEPARATOR, cursor, Character.toString(CHAR_DOT)));
+ break;
+ case CHAR_STRING:
+ builder.setLength(0);
+ cursor = consumeEscaped(builder, chars, cursor, CHAR_STRING);
+ tokens.add(new Token(TokenType.LITERAL_STRING, cursor, builder.toString()));
+ break;
+ case CHAR_IDENTIFIER:
+ builder.setLength(0);
+ cursor = consumeEscaped(builder, chars, cursor, CHAR_IDENTIFIER);
+ tokens.add(new Token(TokenType.IDENTIFIER, cursor, builder.toString()));
+ break;
+ default:
+ if (Character.isWhitespace(curChar)) {
+ continue;
+ }
+ if (isDigit(curChar)) {
+ builder.setLength(0);
+ cursor = consumeInt(builder, chars, cursor);
+ tokens.add(new Token(TokenType.LITERAL_INT, cursor, builder.toString()));
+ break;
+ }
+ builder.setLength(0);
+ cursor = consumeIdentifier(builder, chars, cursor);
+ final String token = builder.toString();
+ final String normalizedToken = token.toUpperCase();
+ if (KEYWORDS.contains(normalizedToken)) {
+ tokens.add(new Token(TokenType.KEYWORD, cursor, normalizedToken));
+ } else {
+ tokens.add(new Token(TokenType.IDENTIFIER, cursor, token));
+ }
+ }
+ }
+
+ return tokens;
+ }
+
+ private static int consumeEscaped(StringBuilder builder, String chars, int cursor, char delimiter) {
+ // skip delimiter
+ cursor++;
+ for (; chars.length() > cursor; cursor++) {
+ final char curChar = chars.charAt(cursor);
+ if (curChar == delimiter && cursor + 1 < chars.length() && chars.charAt(cursor + 1) == delimiter) {
+ // escaping of the escaping char e.g. "'Hello '' World'"
+ cursor++;
+ builder.append(curChar);
+ } else if (curChar == delimiter) {
+ break;
+ } else {
+ builder.append(curChar);
+ }
+ }
+ return cursor;
+ }
+
+ private static int consumeInt(StringBuilder builder, String chars, int cursor) {
+ for (; chars.length() > cursor && isDigit(chars.charAt(cursor)); cursor++) {
+ builder.append(chars.charAt(cursor));
+ }
+ return cursor - 1;
+ }
+
+ private static int consumeIdentifier(StringBuilder builder, String chars, int cursor) {
+ for (; cursor < chars.length() && !isDelimiter(chars.charAt(cursor)); cursor++) {
+ builder.append(chars.charAt(cursor));
+ }
+ return cursor - 1;
+ }
+
+ private enum TokenType {
+ // e.g. "ROW<"
+ BEGIN_SUBTYPE,
+
+ // e.g. "ROW<..>"
+ END_SUBTYPE,
+
+ // e.g. "CHAR("
+ BEGIN_PARAMETER,
+
+ // e.g. "CHAR(...)"
+ END_PARAMETER,
+
+ // e.g. "ROW<INT,"
+ LIST_SEPARATOR,
+
+ // e.g. "ROW<name INT 'Comment'"
+ LITERAL_STRING,
+
+ // CHAR(12
+ LITERAL_INT,
+
+ // e.g. "CHAR" or "TO"
+ KEYWORD,
+
+ // e.g. "ROW<name" or "myCatalog.myDatabase"
+ IDENTIFIER,
+
+ // e.g. "myCatalog.myDatabase."
+ IDENTIFIER_SEPARATOR
+ }
+
+ private enum Keyword {
+ CHAR,
+ VARCHAR,
+ STRING,
+ BOOLEAN,
+ BINARY,
+ VARBINARY,
+ BYTES,
+ DECIMAL,
+ NUMERIC,
+ DEC,
+ TINYINT,
+ SMALLINT,
+ INT,
+ INTEGER,
+ BIGINT,
+ FLOAT,
+ DOUBLE,
+ PRECISION,
+ DATE,
+ TIME,
+ WITH,
+ WITHOUT,
+ LOCAL,
+ ZONE,
+ TIMESTAMP,
+ INTERVAL,
+ YEAR,
+ MONTH,
+ DAY,
+ HOUR,
+ MINUTE,
+ SECOND,
+ TO,
+ ARRAY,
+ MULTISET,
+ MAP,
+ ROW,
+ NULL,
+ ANY,
+ NOT
+ }
+
+ private static final Set<String> KEYWORDS = Stream.of(Keyword.values())
+ .map(k -> k.toString().toUpperCase())
+ .collect(Collectors.toSet());
+
+ private static class Token {
+ public final TokenType type;
+ public final int cursorPosition;
+ public final String value;
+
+ public Token(TokenType type, int cursorPosition, String value) {
+ this.type = type;
+ this.cursorPosition = cursorPosition;
+ this.value = value;
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Token Parsing
+ // --------------------------------------------------------------------------------------------
+
+ private static class TokenParser {
+
+ private final String inputString;
+
+ private final List<Token> tokens;
+
+ private final ClassLoader classLoader;
+
+ private int lastValidToken;
+
+ private int currentToken;
+
+ public TokenParser(String inputString, List<Token> tokens, ClassLoader classLoader) {
+ this.inputString = inputString;
+ this.tokens = tokens;
+ this.classLoader = classLoader;
+ this.lastValidToken = -1;
+ this.currentToken = -1;
+ }
+
+ private LogicalType parseTokens() {
+ final LogicalType type = parseTypeWithNullability();
+ if (hasRemainingTokens()) {
+ nextToken();
+ throw parsingError("Unexpected token: " + token().value);
+ }
+ return type;
+ }
+
+ private int lastCursor() {
+ if (lastValidToken < 0) {
+ return 0;
+ }
+ return tokens.get(lastValidToken).cursorPosition + 1;
+ }
+
+ private ValidationException parsingError(String cause, @Nullable Throwable t) {
+ return new ValidationException(
+ String.format(
+ "Could not parse type at position %d: %s\n Input type string: %s",
+ lastCursor(),
+ cause,
+ inputString),
+ t);
+ }
+
+ private ValidationException parsingError(String cause) {
+ return parsingError(cause, null);
+ }
+
+ private boolean hasRemainingTokens() {
+ return currentToken + 1 < tokens.size();
+ }
+
+ private Token token() {
+ return tokens.get(currentToken);
+ }
+
+ private int tokenAsInt() {
+ try {
+ return Integer.valueOf(token().value);
+ } catch (NumberFormatException e) {
+ throw parsingError("Invalid integer value.", e);
+ }
+ }
+
+ private Keyword tokenAsKeyword() {
+ return Keyword.valueOf(token().value);
+ }
+
+ private String tokenAsString() {
+ return token().value;
+ }
+
+ private void nextToken() {
+ this.currentToken++;
+ if (currentToken >= tokens.size()) {
+ throw parsingError("Unexpected end.");
+ }
+ lastValidToken = this.currentToken - 1;
+ }
+
+ private void nextToken(TokenType type) {
+ nextToken();
+ final Token token = token();
+ if (token.type != type) {
+ throw parsingError("<" + type.name() + "> expected but was <" + token.type + ">.");
+ }
+ }
+
+ private void nextToken(Keyword keyword) {
+ nextToken(TokenType.KEYWORD);
+ final Token token = token();
+ if (!keyword.equals(Keyword.valueOf(token.value))) {
+ throw parsingError("Keyword '" + keyword + "' expected but was '" + token.value + "'.");
+ }
+ }
+
+ private boolean hasNextToken(TokenType... types) {
+ if (currentToken + types.length + 1 > tokens.size()) {
+ return false;
+ }
+ for (int i = 0; i < types.length; i++) {
+ final Token lookAhead = tokens.get(currentToken + i + 1);
+ if (lookAhead.type != types[i]) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean hasNextToken(Keyword... keywords) {
+ if (currentToken + keywords.length + 1 > tokens.size()) {
+ return false;
+ }
+ for (int i = 0; i < keywords.length; i++) {
+ final Token lookAhead = tokens.get(currentToken + i + 1);
+ if (lookAhead.type != TokenType.KEYWORD ||
+ keywords[i] != Keyword.valueOf(lookAhead.value)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean parseNullability() {
+ // "NOT NULL"
+ if (hasNextToken(Keyword.NOT, Keyword.NULL)) {
+ nextToken(Keyword.NOT);
+ nextToken(Keyword.NULL);
+ return false;
+ }
+ // explicit "NULL"
+ else if (hasNextToken(Keyword.NULL)) {
+ nextToken(Keyword.NULL);
+ return true;
+ }
+ // implicit "NULL"
+ return true;
+ }
+
+ private LogicalType parseTypeWithNullability() {
+ final LogicalType logicalType;
+ if (hasNextToken(TokenType.IDENTIFIER)) {
+ logicalType = parseTypeByIdentifier().copy(parseNullability());
+ } else {
+ logicalType = parseTypeByKeyword().copy(parseNullability());
+ }
+
+ // special case: suffix notation for ARRAY and MULTISET types
+ if (hasNextToken(Keyword.ARRAY)) {
+ nextToken(Keyword.ARRAY);
+ return new ArrayType(logicalType).copy(parseNullability());
+ } else if (hasNextToken(Keyword.MULTISET)) {
+ nextToken(Keyword.MULTISET);
+ return new MultisetType(logicalType).copy(parseNullability());
+ }
+
+ return logicalType;
+ }
+
+ private LogicalType parseTypeByKeyword() {
+ nextToken(TokenType.KEYWORD);
+ switch (tokenAsKeyword()) {
+ case CHAR:
+ return parseCharType();
+ case VARCHAR:
+ return parseVarCharType();
+ case STRING:
+ return new VarCharType(VarCharType.MAX_LENGTH);
+ case BOOLEAN:
+ return new BooleanType();
+ case BINARY:
+ return parseBinaryType();
+ case VARBINARY:
+ return parseVarBinaryType();
+ case BYTES:
+ return new VarBinaryType(VarBinaryType.MAX_LENGTH);
+ case DECIMAL:
+ case NUMERIC:
+ case DEC:
+ return parseDecimalType();
+ case TINYINT:
+ return new TinyIntType();
+ case SMALLINT:
+ return new SmallIntType();
+ case INT:
+ case INTEGER:
+ return new IntType();
+ case BIGINT:
+ return new BigIntType();
+ case FLOAT:
+ return new FloatType();
+ case DOUBLE:
+ return parseDoubleType();
+ case DATE:
+ return new DateType();
+ case TIME:
+ return parseTimeType();
+ case TIMESTAMP:
+ return parseTimestampType();
+ case INTERVAL:
+ return parseIntervalType();
+ case ARRAY:
+ return parseArrayType();
+ case MULTISET:
+ return parseMultisetType();
+ case MAP:
+ return parseMapType();
+ case ROW:
+ return parseRowType();
+ case NULL:
+ return new NullType();
+ case ANY:
+ return parseAnyType();
+ default:
+ throw parsingError("Unsupported type: " + token().value);
+ }
+ }
+
+ private LogicalType parseTypeByIdentifier() {
+ nextToken(TokenType.IDENTIFIER);
+ List<String> parts = new ArrayList<>();
+ parts.add(tokenAsString());
+ if (hasNextToken(TokenType.IDENTIFIER_SEPARATOR)) {
+ nextToken(TokenType.IDENTIFIER_SEPARATOR);
+ nextToken(TokenType.IDENTIFIER);
+ parts.add(tokenAsString());
+ }
+ if (hasNextToken(TokenType.IDENTIFIER_SEPARATOR)) {
+ nextToken(TokenType.IDENTIFIER_SEPARATOR);
+ nextToken(TokenType.IDENTIFIER);
+ parts.add(tokenAsString());
+ }
+ return new UnresolvedUserDefinedType(
+ lastPart(parts, 2),
+ lastPart(parts, 1),
+ lastPart(parts, 0));
+ }
+
+ private @Nullable String lastPart(List<String> parts, int inversePos) {
+ final int pos = parts.size() - inversePos - 1;
+ if (pos < 0) {
+ return null;
+ }
+ return parts.get(pos);
+ }
+
+ private int parseStringType() {
+ // explicit length
+ if (hasNextToken(TokenType.BEGIN_PARAMETER)) {
+ nextToken(TokenType.BEGIN_PARAMETER);
+ nextToken(TokenType.LITERAL_INT);
+ final int length = tokenAsInt();
+ nextToken(TokenType.END_PARAMETER);
+ return length;
+ }
+ // implicit length
+ return -1;
+ }
+
+ private LogicalType parseCharType() {
+ final int length = parseStringType();
+ if (length < 0) {
+ return new CharType();
+ } else {
+ return new CharType(length);
+ }
+ }
+
+ private LogicalType parseVarCharType() {
+ final int length = parseStringType();
+ if (length < 0) {
+ return new VarCharType();
+ } else {
+ return new VarCharType(length);
+ }
+ }
+
+ private LogicalType parseBinaryType() {
+ final int length = parseStringType();
+ if (length < 0) {
+ return new BinaryType();
+ } else {
+ return new BinaryType(length);
+ }
+ }
+
+ private LogicalType parseVarBinaryType() {
+ final int length = parseStringType();
+ if (length < 0) {
+ return new VarBinaryType();
+ } else {
+ return new VarBinaryType(length);
+ }
+ }
+
+ private LogicalType parseDecimalType() {
+ int precision = DecimalType.DEFAULT_PRECISION;
+ int scale = DecimalType.DEFAULT_SCALE;
+ if (hasNextToken(TokenType.BEGIN_PARAMETER)) {
+ nextToken(TokenType.BEGIN_PARAMETER);
+ nextToken(TokenType.LITERAL_INT);
+ precision = tokenAsInt();
+ if (hasNextToken(TokenType.LIST_SEPARATOR)) {
+ nextToken(TokenType.LIST_SEPARATOR);
+ nextToken(TokenType.LITERAL_INT);
+ scale = tokenAsInt();
+ }
+ nextToken(TokenType.END_PARAMETER);
+ }
+ return new DecimalType(precision, scale);
+ }
+
+ private LogicalType parseDoubleType() {
+ if (hasNextToken(Keyword.PRECISION)) {
+ nextToken(Keyword.PRECISION);
+ }
+ return new DoubleType();
+ }
+
+ private LogicalType parseTimeType() {
+ int precision = parseOptionalPrecision(TimeType.DEFAULT_PRECISION);
+ if (hasNextToken(Keyword.WITHOUT)) {
+ nextToken(Keyword.WITHOUT);
+ nextToken(Keyword.TIME);
+ nextToken(Keyword.ZONE);
+ }
+ return new TimeType(precision);
+ }
+
+ private LogicalType parseTimestampType() {
+ int precision = parseOptionalPrecision(TimestampType.DEFAULT_PRECISION);
+ if (hasNextToken(Keyword.WITHOUT)) {
+ nextToken(Keyword.WITHOUT);
+ nextToken(Keyword.TIME);
+ nextToken(Keyword.ZONE);
+ } else if (hasNextToken(Keyword.WITH)) {
+ nextToken(Keyword.WITH);
+ if (hasNextToken(Keyword.LOCAL)) {
+ nextToken(Keyword.LOCAL);
+ nextToken(Keyword.TIME);
+ nextToken(Keyword.ZONE);
+ return new LocalZonedTimestampType(precision);
+ } else {
+ nextToken(Keyword.TIME);
+ nextToken(Keyword.ZONE);
+ return new ZonedTimestampType(precision);
+ }
+ }
+ return new TimestampType(precision);
+ }
+
+ private LogicalType parseIntervalType() {
+ nextToken(TokenType.KEYWORD);
+ switch (tokenAsKeyword()) {
+ case YEAR:
+ case MONTH:
+ return parseYearMonthIntervalType();
+ case DAY:
+ case HOUR:
+ case MINUTE:
+ case SECOND:
+ return parseDayTimeIntervalType();
+ default:
+ throw parsingError("Invalid interval resolution.");
+ }
+ }
+
+ private LogicalType parseYearMonthIntervalType() {
+ int yearPrecision = YearMonthIntervalType.DEFAULT_PRECISION;
+ switch (tokenAsKeyword()) {
+ case YEAR:
+ yearPrecision = parseOptionalPrecision(yearPrecision);
+ if (hasNextToken(Keyword.TO)) {
+ nextToken(Keyword.TO);
+ nextToken(Keyword.MONTH);
+ return new YearMonthIntervalType(
+ YearMonthResolution.YEAR_TO_MONTH,
+ yearPrecision);
+ }
+ return new YearMonthIntervalType(
+ YearMonthResolution.YEAR,
+ yearPrecision);
+ case MONTH:
+ return new YearMonthIntervalType(
+ YearMonthResolution.MONTH,
+ yearPrecision);
+ default:
+ throw parsingError("Invalid year-month interval resolution.");
+ }
+ }
+
+ private LogicalType parseDayTimeIntervalType() {
+ int dayPrecision = DayTimeIntervalType.DEFAULT_DAY_PRECISION;
+ int fractionalPrecision = DayTimeIntervalType.DEFAULT_FRACTIONAL_PRECISION;
+ switch (tokenAsKeyword()) {
+ case DAY:
+ dayPrecision = parseOptionalPrecision(dayPrecision);
+ if (hasNextToken(Keyword.TO)) {
+ nextToken(Keyword.TO);
+ nextToken(TokenType.KEYWORD);
+ switch (tokenAsKeyword()) {
+ case HOUR:
+ return new DayTimeIntervalType(
+ DayTimeResolution.DAY_TO_HOUR,
+ dayPrecision,
+ fractionalPrecision);
+ case MINUTE:
+ return new DayTimeIntervalType(
+ DayTimeResolution.DAY_TO_MINUTE,
+ dayPrecision,
+ fractionalPrecision);
+ case SECOND:
+ fractionalPrecision = parseOptionalPrecision(fractionalPrecision);
+ return new DayTimeIntervalType(
+ DayTimeResolution.DAY_TO_SECOND,
+ dayPrecision,
+ fractionalPrecision);
+ default:
+ throw parsingError("Invalid day-time interval resolution.");
+ }
+ }
+ return new DayTimeIntervalType(
+ DayTimeResolution.DAY,
+ dayPrecision,
+ fractionalPrecision);
+ case HOUR:
+ if (hasNextToken(Keyword.TO)) {
+ nextToken(Keyword.TO);
+ nextToken(TokenType.KEYWORD);
+ switch (tokenAsKeyword()) {
+ case MINUTE:
+ return new DayTimeIntervalType(
+ DayTimeResolution.HOUR_TO_MINUTE,
+ dayPrecision,
+ fractionalPrecision);
+ case SECOND:
+ fractionalPrecision = parseOptionalPrecision(fractionalPrecision);
+ return new DayTimeIntervalType(
+ DayTimeResolution.HOUR_TO_SECOND,
+ dayPrecision,
+ fractionalPrecision);
+ default:
+ throw parsingError("Invalid day-time interval resolution.");
+ }
+ }
+ return new DayTimeIntervalType(
+ DayTimeResolution.HOUR,
+ dayPrecision,
+ fractionalPrecision);
+ case MINUTE:
+ if (hasNextToken(Keyword.TO)) {
+ nextToken(Keyword.TO);
+ nextToken(Keyword.SECOND);
+ fractionalPrecision = parseOptionalPrecision(fractionalPrecision);
+ return new DayTimeIntervalType(
+ DayTimeResolution.MINUTE_TO_SECOND,
+ dayPrecision,
+ fractionalPrecision);
+ }
+ return new DayTimeIntervalType(
+ DayTimeResolution.MINUTE,
+ dayPrecision,
+ fractionalPrecision);
+ case SECOND:
+ fractionalPrecision = parseOptionalPrecision(fractionalPrecision);
+ return new DayTimeIntervalType(
+ DayTimeResolution.SECOND,
+ dayPrecision,
+ fractionalPrecision);
+ default:
+ throw parsingError("Invalid day-time interval resolution.");
+ }
+ }
+
+ private int parseOptionalPrecision(int defaultPrecision) {
+ int precision = defaultPrecision;
+ if (hasNextToken(TokenType.BEGIN_PARAMETER)) {
+ nextToken(TokenType.BEGIN_PARAMETER);
+ nextToken(TokenType.LITERAL_INT);
+ precision = tokenAsInt();
+ nextToken(TokenType.END_PARAMETER);
+ }
+ return precision;
+ }
+
+ private LogicalType parseArrayType() {
+ nextToken(TokenType.BEGIN_SUBTYPE);
+ final LogicalType elementType = parseTypeWithNullability();
+ nextToken(TokenType.END_SUBTYPE);
+ return new ArrayType(elementType);
+ }
+
+ private LogicalType parseMultisetType() {
+ nextToken(TokenType.BEGIN_SUBTYPE);
+ final LogicalType elementType = parseTypeWithNullability();
+ nextToken(TokenType.END_SUBTYPE);
+ return new MultisetType(elementType);
+ }
+
+ private LogicalType parseMapType() {
+ nextToken(TokenType.BEGIN_SUBTYPE);
+ final LogicalType keyType = parseTypeWithNullability();
+ nextToken(TokenType.LIST_SEPARATOR);
+ final LogicalType valueType = parseTypeWithNullability();
+ nextToken(TokenType.END_SUBTYPE);
+ return new MapType(keyType, valueType);
+ }
+
+ private LogicalType parseRowType() {
+ List<RowType.RowField> fields;
+ // SQL standard notation
+ if (hasNextToken(TokenType.BEGIN_PARAMETER)) {
+ nextToken(TokenType.BEGIN_PARAMETER);
+ fields = parseRowFields(TokenType.END_PARAMETER);
+ nextToken(TokenType.END_PARAMETER);
+ } else {
+ nextToken(TokenType.BEGIN_SUBTYPE);
+ fields = parseRowFields(TokenType.END_SUBTYPE);
+ nextToken(TokenType.END_SUBTYPE);
+ }
+ return new RowType(fields);
+ }
+
+ private List<RowType.RowField> parseRowFields(TokenType endToken) {
+ List<RowType.RowField> fields = new ArrayList<>();
+ boolean isFirst = true;
+ while (!hasNextToken(endToken)) {
+ if (isFirst) {
+ isFirst = false;
+ } else {
+ nextToken(TokenType.LIST_SEPARATOR);
+ }
+ nextToken(TokenType.IDENTIFIER);
+ final String name = tokenAsString();
+ final LogicalType type = parseTypeWithNullability();
+ if (hasNextToken(TokenType.LITERAL_STRING)) {
+ nextToken(TokenType.LITERAL_STRING);
+ final String description = tokenAsString();
+ fields.add(new RowType.RowField(name, type, description));
+ } else {
+ fields.add(new RowType.RowField(name, type));
+ }
+ }
+ return fields;
+ }
+
+ @SuppressWarnings("unchecked")
+ private LogicalType parseAnyType() {
+ nextToken(TokenType.BEGIN_PARAMETER);
+ nextToken(TokenType.LITERAL_STRING);
+ final String className = tokenAsString();
+
+ nextToken(TokenType.LIST_SEPARATOR);
+ nextToken(TokenType.LITERAL_STRING);
+ final String serializer = tokenAsString();
+ nextToken(TokenType.END_PARAMETER);
+
+ try {
+ final Class<?> clazz = Class.forName(className, true, classLoader);
+ final byte[] bytes = EncodingUtils.decodeBase64ToBytes(serializer);
+ final DataInputDeserializer inputDeserializer = new DataInputDeserializer(bytes);
+ final TypeSerializerSnapshot<?> snapshot = TypeSerializerSnapshot.readVersionedSnapshot(
+ inputDeserializer,
+ classLoader);
+ return new AnyType(clazz, snapshot.restoreSerializer());
+ } catch (Throwable t) {
+ throw parsingError(
+ "Unable to restore the ANY type of class '" + className + "' with " +
+ "serializer snapshot '" + serializer + "'.", t);
+ }
+ }
+ }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java
index 95d10e3..ca88427 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/EncodingUtils.java
@@ -110,12 +110,16 @@ public abstract class EncodingUtils {
return new String(java.util.Base64.getEncoder().encode(bytes), UTF_8);
}
+ public static byte[] decodeBase64ToBytes(String base64) {
+ return java.util.Base64.getDecoder().decode(base64.getBytes(UTF_8));
+ }
+
public static String encodeStringToBase64(String string) {
return encodeBytesToBase64(string.getBytes(UTF_8));
}
public static String decodeBase64ToString(String base64) {
- return new String(java.util.Base64.getDecoder().decode(base64.getBytes(UTF_8)), UTF_8);
+ return new String(decodeBase64ToBytes(base64), UTF_8);
}
public static byte[] md5(String string) {
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TypeStringUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TypeStringUtils.java
index faf26d0..6bf414a 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TypeStringUtils.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TypeStringUtils.java
@@ -32,13 +32,21 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
import java.util.ArrayList;
import java.util.List;
/**
- * Utilities to convert {@link TypeInformation} into a string representation and back.
- */
+ * Utilities to convert {@link TypeInformation} into a string representation and back.
+ *
+ * @deprecated This utility is based on {@link TypeInformation}. However, the Table & SQL API is
+ * currently updated to use {@link DataType}s based on {@link LogicalType}s. Use
+ * {@link LogicalTypeParser} instead.
+ */
+@Deprecated
@PublicEvolving
public class TypeStringUtils {
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeParserTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeParserTest.java
new file mode 100644
index 0000000..df55424
--- /dev/null
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeParserTest.java
@@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.AnyType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.UnresolvedUserDefinedType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType.YearMonthResolution;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.UNRESOLVED;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link LogicalTypeParser}.
+ */
+@RunWith(Parameterized.class)
+public class LogicalTypeParserTest {
+
+ @Parameters(name = "{index}: [From: {0}, To: {1}]")
+ public static List<TestSpec> testData() {
+ return Arrays.asList(
+
+ TestSpec
+ .forString("CHAR")
+ .expectType(new CharType()),
+
+ TestSpec
+ .forString("CHAR NOT NULL")
+ .expectType(new CharType().copy(false)),
+
+ TestSpec
+ .forString("CHAR NOT \t\nNULL")
+ .expectType(new CharType().copy(false)),
+
+ TestSpec
+ .forString("char not null")
+ .expectType(new CharType().copy(false)),
+
+ TestSpec
+ .forString("CHAR NULL")
+ .expectType(new CharType()),
+
+ TestSpec
+ .forString("CHAR(33)")
+ .expectType(new CharType(33)),
+
+ TestSpec
+ .forString("VARCHAR")
+ .expectType(new VarCharType()),
+
+ TestSpec
+ .forString("VARCHAR(33)")
+ .expectType(new VarCharType(33)),
+
+ TestSpec
+ .forString("STRING")
+ .expectType(new VarCharType(VarCharType.MAX_LENGTH)),
+
+ TestSpec
+ .forString("BOOLEAN")
+ .expectType(new BooleanType()),
+
+ TestSpec
+ .forString("BINARY")
+ .expectType(new BinaryType()),
+
+ TestSpec
+ .forString("BINARY(33)")
+ .expectType(new BinaryType(33)),
+
+ TestSpec
+ .forString("VARBINARY")
+ .expectType(new VarBinaryType()),
+
+ TestSpec
+ .forString("VARBINARY(33)")
+ .expectType(new VarBinaryType(33)),
+
+ TestSpec
+ .forString("BYTES")
+ .expectType(new VarBinaryType(VarBinaryType.MAX_LENGTH)),
+
+ TestSpec
+ .forString("DECIMAL")
+ .expectType(new DecimalType()),
+
+ TestSpec
+ .forString("DEC")
+ .expectType(new DecimalType()),
+
+ TestSpec
+ .forString("NUMERIC")
+ .expectType(new DecimalType()),
+
+ TestSpec
+ .forString("DECIMAL(10)")
+ .expectType(new DecimalType(10)),
+
+ TestSpec
+ .forString("DEC(10)")
+ .expectType(new DecimalType(10)),
+
+ TestSpec
+ .forString("NUMERIC(10)")
+ .expectType(new DecimalType(10)),
+
+ TestSpec
+ .forString("DECIMAL(10, 3)")
+ .expectType(new DecimalType(10, 3)),
+
+ TestSpec
+ .forString("DEC(10, 3)")
+ .expectType(new DecimalType(10, 3)),
+
+ TestSpec
+ .forString("NUMERIC(10, 3)")
+ .expectType(new DecimalType(10, 3)),
+
+ TestSpec
+ .forString("TINYINT")
+ .expectType(new TinyIntType()),
+
+ TestSpec
+ .forString("SMALLINT")
+ .expectType(new SmallIntType()),
+
+ TestSpec
+ .forString("INTEGER")
+ .expectType(new IntType()),
+
+ TestSpec
+ .forString("INT")
+ .expectType(new IntType()),
+
+ TestSpec
+ .forString("BIGINT")
+ .expectType(new BigIntType()),
+
+ TestSpec
+ .forString("FLOAT")
+ .expectType(new FloatType()),
+
+ TestSpec
+ .forString("DOUBLE")
+ .expectType(new DoubleType()),
+
+ TestSpec
+ .forString("DOUBLE PRECISION")
+ .expectType(new DoubleType()),
+
+ TestSpec
+ .forString("DATE")
+ .expectType(new DateType()),
+
+ TestSpec
+ .forString("TIME")
+ .expectType(new TimeType()),
+
+ TestSpec
+ .forString("TIME(3)")
+ .expectType(new TimeType(3)),
+
+ TestSpec
+ .forString("TIME WITHOUT TIME ZONE")
+ .expectType(new TimeType()),
+
+ TestSpec
+ .forString("TIME(3) WITHOUT TIME ZONE")
+ .expectType(new TimeType(3)),
+
+ TestSpec
+ .forString("TIMESTAMP")
+ .expectType(new TimestampType()),
+
+ TestSpec
+ .forString("TIMESTAMP(3)")
+ .expectType(new TimestampType(3)),
+
+ TestSpec
+ .forString("TIMESTAMP WITHOUT TIME ZONE")
+ .expectType(new TimestampType()),
+
+ TestSpec
+ .forString("TIMESTAMP(3) WITHOUT TIME ZONE")
+ .expectType(new TimestampType(3)),
+
+ TestSpec
+ .forString("TIMESTAMP WITH TIME ZONE")
+ .expectType(new ZonedTimestampType()),
+
+ TestSpec
+ .forString("TIMESTAMP(3) WITH TIME ZONE")
+ .expectType(new ZonedTimestampType(3)),
+
+ TestSpec
+ .forString("TIMESTAMP WITH LOCAL TIME ZONE")
+ .expectType(new LocalZonedTimestampType()),
+
+ TestSpec
+ .forString("TIMESTAMP(3) WITH LOCAL TIME ZONE")
+ .expectType(new LocalZonedTimestampType(3)),
+
+ TestSpec
+ .forString("INTERVAL YEAR")
+ .expectType(new YearMonthIntervalType(YearMonthResolution.YEAR)),
+
+ TestSpec
+ .forString("INTERVAL YEAR(4)")
+ .expectType(new YearMonthIntervalType(YearMonthResolution.YEAR, 4)),
+
+ TestSpec
+ .forString("INTERVAL MONTH")
+ .expectType(new YearMonthIntervalType(YearMonthResolution.MONTH)),
+
+ TestSpec
+ .forString("INTERVAL YEAR TO MONTH")
+ .expectType(new YearMonthIntervalType(YearMonthResolution.YEAR_TO_MONTH)),
+
+ TestSpec
+ .forString("INTERVAL YEAR(4) TO MONTH")
+ .expectType(new YearMonthIntervalType(YearMonthResolution.YEAR_TO_MONTH, 4)),
+
+ TestSpec
+ .forString("INTERVAL DAY(2) TO SECOND(3)")
+ .expectType(new DayTimeIntervalType(DayTimeResolution.DAY_TO_SECOND, 2, 3)),
+
+ TestSpec
+ .forString("INTERVAL HOUR TO SECOND(3)")
+ .expectType(
+ new DayTimeIntervalType(
+ DayTimeResolution.HOUR_TO_SECOND,
+ DayTimeIntervalType.DEFAULT_DAY_PRECISION,
+ 3)
+ ),
+
+ TestSpec
+ .forString("INTERVAL MINUTE")
+ .expectType(new DayTimeIntervalType(DayTimeResolution.MINUTE)),
+
+ TestSpec
+ .forString("ARRAY<TIMESTAMP(3) WITH LOCAL TIME ZONE>")
+ .expectType(new ArrayType(new LocalZonedTimestampType(3))),
+
+ TestSpec
+ .forString("ARRAY<INT NOT NULL>")
+ .expectType(new ArrayType(new IntType(false))),
+
+ TestSpec
+ .forString("INT ARRAY")
+ .expectType(new ArrayType(new IntType())),
+
+ TestSpec
+ .forString("INT NOT NULL ARRAY")
+ .expectType(new ArrayType(new IntType(false))),
+
+ TestSpec
+ .forString("INT ARRAY NOT NULL")
+ .expectType(new ArrayType(false, new IntType())),
+
+ TestSpec
+ .forString("MULTISET<INT NOT NULL>")
+ .expectType(new MultisetType(new IntType(false))),
+
+ TestSpec
+ .forString("INT MULTISET")
+ .expectType(new MultisetType(new IntType())),
+
+ TestSpec
+ .forString("INT NOT NULL MULTISET")
+ .expectType(new MultisetType(new IntType(false))),
+
+ TestSpec
+ .forString("INT MULTISET NOT NULL")
+ .expectType(new MultisetType(false, new IntType())),
+
+ TestSpec
+ .forString("MAP<BIGINT, BOOLEAN>")
+ .expectType(new MapType(new BigIntType(), new BooleanType())),
+
+ TestSpec
+ .forString("ROW<f0 INT NOT NULL, f1 BOOLEAN>")
+ .expectType(
+ new RowType(
+ Arrays.asList(
+ new RowType.RowField("f0", new IntType(false)),
+ new RowType.RowField("f1", new BooleanType())))
+ ),
+
+ TestSpec
+ .forString("ROW(f0 INT NOT NULL, f1 BOOLEAN)")
+ .expectType(
+ new RowType(
+ Arrays.asList(
+ new RowType.RowField("f0", new IntType(false)),
+ new RowType.RowField("f1", new BooleanType())))
+ ),
+
+ TestSpec
+ .forString("ROW<`f0` INT>")
+ .expectType(
+ new RowType(
+ Collections.singletonList(new RowType.RowField("f0", new IntType())))
+ ),
+
+ TestSpec
+ .forString("ROW(`f0` INT)")
+ .expectType(
+ new RowType(
+ Collections.singletonList(new RowType.RowField("f0", new IntType())))
+ ),
+
+ TestSpec
+ .forString("ROW<>")
+ .expectType(new RowType(Collections.emptyList())),
+
+ TestSpec
+ .forString("ROW()")
+ .expectType(new RowType(Collections.emptyList())),
+
+ TestSpec
+ .forString("ROW<f0 INT NOT NULL 'This is a comment.', f1 BOOLEAN 'This as well.'>")
+ .expectType(
+ new RowType(
+ Arrays.asList(
+ new RowType.RowField("f0", new IntType(false), "This is a comment."),
+ new RowType.RowField("f1", new BooleanType(), "This as well.")))
+ ),
+
+ TestSpec
+ .forString("NULL")
+ .expectType(new NullType()),
+
+ TestSpec
+ .forString(createAnyType(LogicalTypeParserTest.class).asSerializableString())
+ .expectType(createAnyType(LogicalTypeParserTest.class)),
+
+ TestSpec
+ .forString("cat.db.MyType")
+ .expectType(new UnresolvedUserDefinedType("cat", "db", "MyType")),
+
+ TestSpec
+ .forString("`db`.`MyType`")
+ .expectType(new UnresolvedUserDefinedType(null, "db", "MyType")),
+
+ TestSpec
+ .forString("MyType")
+ .expectType(new UnresolvedUserDefinedType(null, null, "MyType")),
+
+ TestSpec
+ .forString("ARRAY<MyType>")
+ .expectType(new ArrayType(new UnresolvedUserDefinedType(null, null, "MyType"))),
+
+ TestSpec
+ .forString("ROW<f0 MyType, f1 `c`.`d`.`t`>")
+ .expectType(
+ RowType.of(
+ new UnresolvedUserDefinedType(null, null, "MyType"),
+ new UnresolvedUserDefinedType("c", "d", "t"))
+ ),
+
+ // error message testing
+
+ TestSpec
+ .forString("ROW<`f0")
+ .expectErrorMessage("Unexpected end"),
+
+ TestSpec
+ .forString("ROW<`f0`")
+ .expectErrorMessage("Unexpected end"),
+
+ TestSpec
+ .forString("VARCHAR(test)")
+ .expectErrorMessage("<LITERAL_INT> expected"),
+
+ TestSpec
+ .forString("VARCHAR(33333333333)")
+ .expectErrorMessage("Invalid integer value"),
+
+ TestSpec
+ .forString("ROW<field INT, field2>")
+ .expectErrorMessage("<KEYWORD> expected"),
+
+ TestSpec
+ .forString("ANY('unknown.class', '')")
+ .expectErrorMessage("Unable to restore the ANY type")
+ );
+ }
+
+ @Parameter
+ public TestSpec testSpec;
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testParsing() {
+ if (testSpec.expectedType != null) {
+ assertThat(
+ LogicalTypeParser.parse(testSpec.typeString),
+ equalTo(testSpec.expectedType));
+ }
+ }
+
+ @Test
+ public void testSerializableParsing() {
+ if (testSpec.expectedType != null) {
+ if (!hasRoot(testSpec.expectedType, UNRESOLVED) &&
+ testSpec.expectedType.getChildren().stream().noneMatch(t -> hasRoot(t, UNRESOLVED))) {
+ assertThat(
+ LogicalTypeParser.parse(testSpec.expectedType.asSerializableString()),
+ equalTo(testSpec.expectedType));
+ }
+ }
+ }
+
+ @Test
+ public void testErrorMessage() {
+ if (testSpec.expectedErrorMessage != null) {
+ thrown.expect(ValidationException.class);
+ thrown.expectMessage(testSpec.expectedErrorMessage);
+
+ LogicalTypeParser.parse(testSpec.typeString);
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private static class TestSpec {
+
+ private final String typeString;
+
+ private @Nullable LogicalType expectedType;
+
+ private @Nullable String expectedErrorMessage;
+
+ private TestSpec(String typeString) {
+ this.typeString = typeString;
+ }
+
+ static TestSpec forString(String typeString) {
+ return new TestSpec(typeString);
+ }
+
+ TestSpec expectType(LogicalType expectedType) {
+ this.expectedType = expectedType;
+ return this;
+ }
+
+ TestSpec expectErrorMessage(String expectedErrorMessage) {
+ this.expectedErrorMessage = expectedErrorMessage;
+ return this;
+ }
+ }
+
+ private static <T> AnyType<T> createAnyType(Class<T> clazz) {
+ return new AnyType<>(clazz, new KryoSerializer<>(clazz, new ExecutionConfig()));
+ }
+}