You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/05/29 06:19:30 UTC

[flink] branch master updated (d3e5bf6 -> 1b9fb2b)

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from d3e5bf6  [FLINK-9172][sql-client][tabe] Support catalogs in SQL-Client yaml config file
     new 5b91db9  [hotfix][table-common] Update CHAR and BINARY in accordance with the SQL standard
     new 1b525fd  [hotfix][table-common] Add missing getter to TimeType
     new c5eb8a7  [hotfix][table-common] Add more logical type check utilities
     new 551b8ef  [hotfix][table-common] Add a value to data type converter
     new 1b9fb2b  [hotfix][table-common] Fix invalid class to data type conversion

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/flink/table/api/DataTypes.java |   2 +-
 .../flink/table/types/logical/BinaryType.java      |  26 ++-
 .../apache/flink/table/types/logical/CharType.java |  33 ++-
 .../apache/flink/table/types/logical/TimeType.java |   4 +
 .../types/logical/utils/LogicalTypeChecks.java     | 183 ++++++++++++++--
 .../table/types/utils/ClassDataTypeConverter.java  |   2 +-
 .../table/types/utils/ValueDataTypeConverter.java  | 231 +++++++++++++++++++++
 .../table/types/ClassDataTypeConverterTest.java    |   2 +
 .../apache/flink/table/types/LogicalTypesTest.java |   2 +-
 .../table/types/ValueDataTypeConverterTest.java    | 160 ++++++++++++++
 10 files changed, 624 insertions(+), 21 deletions(-)
 create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/ValueDataTypeConverter.java
 create mode 100644 flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/ValueDataTypeConverterTest.java


[flink] 04/05: [hotfix][table-common] Add a value to data type converter

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 551b8ef507109218a428baac2d5682fec2381a4f
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon May 27 12:45:13 2019 +0200

    [hotfix][table-common] Add a value to data type converter
---
 .../table/types/utils/ValueDataTypeConverter.java  | 231 +++++++++++++++++++++
 .../table/types/ValueDataTypeConverterTest.java    | 160 ++++++++++++++
 2 files changed, 391 insertions(+)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/ValueDataTypeConverter.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/ValueDataTypeConverter.java
new file mode 100644
index 0000000..3b311ce
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/ValueDataTypeConverter.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+
+import java.math.BigDecimal;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+/**
+ * Value-based data type extractor that supports extraction of clearly identifiable data types for
+ * input conversion.
+ *
+ * <p>This converter is more precise than {@link ClassDataTypeConverter} because it also considers
+ * nullability, length, precision, and scale of values.
+ */
+@Internal
+public final class ValueDataTypeConverter {
+
+	/**
+	 * Returns the clearly identifiable data type if possible. For example, {@code 12L} can be
+	 * expressed as {@code DataTypes.BIGINT().notNull()}. However, for example, {@code null} could
+	 * be any type and is not supported.
+	 *
+	 * <p>All types of the {@link LogicalTypeFamily#PREDEFINED} family and arrays are supported.
+	 */
+	public static Optional<DataType> extractDataType(Object value) {
+		if (value == null) {
+			return Optional.empty();
+		}
+
+		DataType convertedDataType = null;
+
+		if (value instanceof String) {
+			convertedDataType = convertToCharType((String) value);
+		}
+
+		// byte arrays have higher priority than regular arrays
+		else if (value instanceof byte[]) {
+			convertedDataType = convertToBinaryType((byte[]) value);
+		}
+
+		else if (value instanceof BigDecimal) {
+			convertedDataType = convertToDecimalType((BigDecimal) value);
+		}
+
+		else if (value instanceof java.time.LocalTime) {
+			convertedDataType = convertToTimeType((java.time.LocalTime) value);
+		}
+
+		else if (value instanceof java.time.LocalDateTime) {
+			convertedDataType = convertToTimestampType(((java.time.LocalDateTime) value).getNano());
+		}
+
+		else if (value instanceof java.sql.Timestamp) {
+			convertedDataType = convertToTimestampType(((java.sql.Timestamp) value).getNanos());
+		}
+
+		else if (value instanceof java.time.ZonedDateTime) {
+			convertedDataType = convertToZonedTimestampType(((java.time.ZonedDateTime) value).getNano());
+		}
+
+		else if (value instanceof java.time.OffsetDateTime) {
+			convertedDataType = convertToZonedTimestampType(((java.time.OffsetDateTime) value).getNano());
+		}
+
+		else if (value instanceof java.time.Instant) {
+			convertedDataType = convertToLocalZonedTimestampType(((java.time.Instant) value).getNano());
+		}
+
+		else if (value instanceof java.time.Period) {
+			convertedDataType = convertToYearMonthIntervalType(((java.time.Period) value).getYears());
+		}
+
+		else if (value instanceof java.time.Duration) {
+			final java.time.Duration duration = (java.time.Duration) value;
+			convertedDataType = convertToDayTimeIntervalType(duration.toDays(), duration.getNano());
+		}
+
+		else if (value instanceof Object[]) {
+			// don't let the class-based extraction kick in if array elements differ
+			return convertToArrayType((Object[]) value)
+				.map(dt -> dt.notNull().bridgedTo(value.getClass()));
+		}
+
+		final Optional<DataType> resultType;
+		if (convertedDataType != null) {
+			resultType = Optional.of(convertedDataType);
+		} else {
+			// class-based extraction is possible for BOOLEAN, TINYINT, SMALLINT, INT, FLOAT, DOUBLE,
+			// DATE, TIME with java.sql.Time, and arrays of primitive types
+			resultType = ClassDataTypeConverter.extractDataType(value.getClass());
+		}
+		return resultType.map(dt -> dt.notNull().bridgedTo(value.getClass()));
+	}
+
+	private static DataType convertToCharType(String string) {
+		if (string.isEmpty()) {
+			return new AtomicDataType(CharType.ofEmptyLiteral());
+		}
+		return DataTypes.CHAR(string.length());
+	}
+
+	private static DataType convertToBinaryType(byte[] bytes) {
+		if (bytes.length == 0) {
+			return new AtomicDataType(BinaryType.ofEmptyLiteral());
+		}
+		return DataTypes.BINARY(bytes.length);
+	}
+
+	private static DataType convertToDecimalType(BigDecimal decimal) {
+		// let underlying layers check if precision and scale are supported
+		return DataTypes.DECIMAL(decimal.precision(), decimal.scale());
+	}
+
+	private static DataType convertToTimeType(java.time.LocalTime time) {
+		return DataTypes.TIME(fractionalSecondPrecision(time.getNano()));
+	}
+
+	private static DataType convertToTimestampType(int nanos) {
+		return DataTypes.TIMESTAMP(fractionalSecondPrecision(nanos));
+	}
+
+	private static DataType convertToZonedTimestampType(int nanos) {
+		return DataTypes.TIMESTAMP_WITH_TIME_ZONE(fractionalSecondPrecision(nanos));
+	}
+
+	private static DataType convertToLocalZonedTimestampType(int nanos) {
+		return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(fractionalSecondPrecision(nanos));
+	}
+
+	private static DataType convertToYearMonthIntervalType(int years) {
+		return DataTypes.INTERVAL(DataTypes.YEAR(yearPrecision(years)), DataTypes.MONTH());
+	}
+
+	private static DataType convertToDayTimeIntervalType(long days, int nanos) {
+		return DataTypes.INTERVAL(
+			DataTypes.DAY(dayPrecision(days)),
+			DataTypes.SECOND(fractionalSecondPrecision(nanos)));
+	}
+
+	private static Optional<DataType> convertToArrayType(Object[] array) {
+		// fallback to class based-extraction if no values exist
+		if (array.length == 0 || Stream.of(array).allMatch(Objects::isNull)) {
+			return extractElementTypeFromClass(array);
+		}
+
+		return extractElementTypeFromValues(array);
+	}
+
+	private static Optional<DataType> extractElementTypeFromValues(Object[] array) {
+		DataType elementType = null;
+		for (Object element : array) {
+			// null values are wildcard array elements
+			if (element == null) {
+				continue;
+			}
+
+			final Optional<DataType> possibleElementType = extractDataType(element);
+			if (!possibleElementType.isPresent()) {
+				return Optional.empty();
+			}
+
+			// for simplification, we assume that array elements can always be nullable
+			// otherwise mismatches could occur when dealing with nested arrays
+			final DataType extractedElementType = possibleElementType.get().nullable();
+
+			// ensure that all elements have the same type;
+			// in theory the logic could be improved by converting an array with elements
+			// [CHAR(1), CHAR(2)] into an array of CHAR(2) but this can lead to value
+			// modification (i.e. adding spaces) which is not intended.
+			if (elementType != null && !extractedElementType.equals(elementType)) {
+				return Optional.empty();
+			}
+			elementType = extractedElementType;
+		}
+
+		return Optional.ofNullable(elementType)
+			.map(DataTypes::ARRAY);
+	}
+
+	private static Optional<DataType> extractElementTypeFromClass(Object[] array) {
+		final Optional<DataType> possibleElementType =
+			ClassDataTypeConverter.extractDataType(array.getClass().getComponentType());
+
+		// for simplification, we assume that array elements can always be nullable
+		return possibleElementType
+			.map(DataType::nullable)
+			.map(DataTypes::ARRAY);
+	}
+
+	private static int fractionalSecondPrecision(int nanos) {
+		return String.format("%09d", nanos).replaceAll("0+$", "").length();
+	}
+
+	private static int yearPrecision(int years) {
+		return String.valueOf(years).length();
+	}
+
+	private static int dayPrecision(long days) {
+		return String.valueOf(days).length();
+	}
+
+	private ValueDataTypeConverter() {
+		// no instantiation
+	}
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/ValueDataTypeConverterTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/ValueDataTypeConverterTest.java
new file mode 100644
index 0000000..9c00181
--- /dev/null
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/ValueDataTypeConverterTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.utils.ValueDataTypeConverter;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.annotation.Nullable;
+
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.Period;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link ValueDataTypeConverter}.
+ */
+@RunWith(Parameterized.class)
+public class ValueDataTypeConverterTest {
+
+	@Parameterized.Parameters(name = "[{index}] value: {0} type: {1}")
+	public static List<Object[]> testData() {
+		return Arrays.asList(
+			new Object[][]{
+
+				{"Hello World", DataTypes.CHAR(11)},
+
+				{"", new AtomicDataType(CharType.ofEmptyLiteral())},
+
+				{new byte[]{1, 2, 3}, DataTypes.BINARY(3)},
+
+				{new byte[0], new AtomicDataType(BinaryType.ofEmptyLiteral())},
+
+				{BigDecimal.ZERO, DataTypes.DECIMAL(1, 0)},
+
+				{new BigDecimal("12.123"), DataTypes.DECIMAL(5, 3)},
+
+				{12, DataTypes.INT()},
+
+				{LocalTime.of(13, 24, 25, 1000), DataTypes.TIME(6)},
+
+				{LocalTime.of(13, 24, 25, 0), DataTypes.TIME(0)},
+
+				{LocalTime.of(13, 24, 25, 1), DataTypes.TIME(9)},
+
+				{LocalTime.of(13, 24, 25, 999_999_999), DataTypes.TIME(9)},
+
+				{LocalDateTime.of(2019, 11, 11, 13, 24, 25, 1001), DataTypes.TIMESTAMP(9)},
+
+				{
+					ZonedDateTime.of(2019, 11, 11, 13, 24, 25, 1001, ZoneId.systemDefault()),
+					DataTypes.TIMESTAMP_WITH_TIME_ZONE(9).bridgedTo(ZonedDateTime.class)
+				},
+
+				{
+					OffsetDateTime.of(2019, 11, 11, 13, 24, 25, 1001, ZoneOffset.UTC),
+					DataTypes.TIMESTAMP_WITH_TIME_ZONE(9).bridgedTo(OffsetDateTime.class)
+				},
+
+				{
+					Instant.ofEpochMilli(12345602021L),
+					DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).bridgedTo(Instant.class)
+				},
+
+				{
+					Period.ofYears(1000),
+					DataTypes.INTERVAL(DataTypes.YEAR(4), DataTypes.MONTH()).bridgedTo(Period.class)
+				},
+
+				{
+					Duration.ofMillis(1100),
+					DataTypes.INTERVAL(DataTypes.DAY(1), DataTypes.SECOND(1)).bridgedTo(Duration.class)
+				},
+
+				{
+					Duration.ofDays(42),
+					DataTypes.INTERVAL(DataTypes.DAY(2), DataTypes.SECOND(0)).bridgedTo(Duration.class)
+				},
+
+				{
+					Timestamp.valueOf("2018-01-01 12:13:14.123"),
+					DataTypes.TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class)
+				},
+
+				{new Integer[]{1, 2, 3}, DataTypes.ARRAY(DataTypes.INT())},
+
+				{new Integer[]{1, null, 3}, DataTypes.ARRAY(DataTypes.INT())},
+
+				{
+					new BigDecimal[]{new BigDecimal("12.1234"), new BigDecimal("42.4321"), new BigDecimal("20.0000")},
+					DataTypes.ARRAY(DataTypes.DECIMAL(6, 4))
+				},
+
+				{
+					new BigDecimal[]{null, new BigDecimal("42.4321")},
+					DataTypes.ARRAY(DataTypes.DECIMAL(6, 4))
+				},
+
+				{new Integer[0], DataTypes.ARRAY(DataTypes.INT())},
+
+				{
+					new Integer[][]{new Integer[]{1, null, 3}, new Integer[0], new Integer[]{1}},
+					DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT()))
+				},
+
+				{
+					new BigDecimal[0],
+					null
+				},
+			}
+		);
+	}
+
+	@Parameterized.Parameter
+	public Object value;
+
+	@Parameterized.Parameter(1)
+	public @Nullable DataType dataType;
+
+	@Test
+	public void testClassToDataTypeConversion() {
+		assertEquals(
+			Optional.ofNullable(dataType).map(DataType::notNull),
+			ValueDataTypeConverter.extractDataType(value));
+	}
+}


[flink] 03/05: [hotfix][table-common] Add more logical type check utilities

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c5eb8a7bdf379514fb50fc6703ae984d3cacb64b
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon May 27 12:10:55 2019 +0200

    [hotfix][table-common] Add more logical type check utilities
---
 .../types/logical/utils/LogicalTypeChecks.java     | 183 +++++++++++++++++++--
 1 file changed, 171 insertions(+), 12 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java
index 3958922..24f1578 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java
@@ -19,21 +19,41 @@
 package org.apache.flink.table.types.logical.utils;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.LocalZonedTimestampType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.TimeType;
 import org.apache.flink.table.types.logical.TimestampKind;
 import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
 import org.apache.flink.table.types.logical.ZonedTimestampType;
 
 /**
- * Utilities for checking {@link LogicalType}.
+ * Utilities for checking {@link LogicalType} and avoiding a lot of type casting and repetitive work.
  */
 @Internal
 public final class LogicalTypeChecks {
 
-	private static final TimeAttributeChecker TIME_ATTRIBUTE_CHECKER = new TimeAttributeChecker();
+	private static final TimestampKindExtractor TIMESTAMP_KIND_EXTRACTOR = new TimestampKindExtractor();
+
+	private static final LengthExtractor LENGTH_EXTRACTOR = new LengthExtractor();
+
+	private static final PrecisionExtractor PRECISION_EXTRACTOR = new PrecisionExtractor();
+
+	private static final ScaleExtractor SCALE_EXTRACTOR = new ScaleExtractor();
+
+	private static final YearPrecisionExtractor YEAR_PRECISION_EXTRACTOR = new YearPrecisionExtractor();
+
+	private static final DayPrecisionExtractor DAY_PRECISION_EXTRACTOR = new DayPrecisionExtractor();
+
+	private static final FractionalPrecisionExtractor FRACTIONAL_PRECISION_EXTRACTOR = new FractionalPrecisionExtractor();
 
 	public static boolean hasRoot(LogicalType logicalType, LogicalTypeRoot typeRoot) {
 		return logicalType.getTypeRoot() == typeRoot;
@@ -44,15 +64,63 @@ public final class LogicalTypeChecks {
 	}
 
 	public static boolean isTimeAttribute(LogicalType logicalType) {
-		return logicalType.accept(TIME_ATTRIBUTE_CHECKER) != TimestampKind.REGULAR;
+		return logicalType.accept(TIMESTAMP_KIND_EXTRACTOR) != TimestampKind.REGULAR;
 	}
 
 	public static boolean isRowtimeAttribute(LogicalType logicalType) {
-		return logicalType.accept(TIME_ATTRIBUTE_CHECKER) == TimestampKind.ROWTIME;
+		return logicalType.accept(TIMESTAMP_KIND_EXTRACTOR) == TimestampKind.ROWTIME;
 	}
 
 	public static boolean isProctimeAttribute(LogicalType logicalType) {
-		return logicalType.accept(TIME_ATTRIBUTE_CHECKER) == TimestampKind.PROCTIME;
+		return logicalType.accept(TIMESTAMP_KIND_EXTRACTOR) == TimestampKind.PROCTIME;
+	}
+
+	public static int getLength(LogicalType logicalType) {
+		return logicalType.accept(LENGTH_EXTRACTOR);
+	}
+
+	public static boolean hasLength(LogicalType logicalType, int length) {
+		return getLength(logicalType) == length;
+	}
+
+	public static int getPrecision(LogicalType logicalType) {
+		return logicalType.accept(PRECISION_EXTRACTOR);
+	}
+
+	public static boolean hasPrecision(LogicalType logicalType, int precision) {
+		return getPrecision(logicalType) == precision;
+	}
+
+	public static int getScale(LogicalType logicalType) {
+		return logicalType.accept(SCALE_EXTRACTOR);
+	}
+
+	public static boolean hasScale(LogicalType logicalType, int scale) {
+		return getScale(logicalType) == scale;
+	}
+
+	public static int getYearPrecision(LogicalType logicalType) {
+		return logicalType.accept(YEAR_PRECISION_EXTRACTOR);
+	}
+
+	public static boolean hasYearPrecision(LogicalType logicalType, int yearPrecision) {
+		return getYearPrecision(logicalType) == yearPrecision;
+	}
+
+	public static int getDayPrecision(LogicalType logicalType) {
+		return logicalType.accept(DAY_PRECISION_EXTRACTOR);
+	}
+
+	public static boolean hasDayPrecision(LogicalType logicalType, int yearPrecision) {
+		return getDayPrecision(logicalType) == yearPrecision;
+	}
+
+	public static int getFractionalPrecision(LogicalType logicalType) {
+		return logicalType.accept(FRACTIONAL_PRECISION_EXTRACTOR);
+	}
+
+	public static boolean hasFractionalPrecision(LogicalType logicalType, int fractionalPrecision) {
+		return getFractionalPrecision(logicalType) == fractionalPrecision;
 	}
 
 	private LogicalTypeChecks() {
@@ -61,7 +129,104 @@ public final class LogicalTypeChecks {
 
 	// --------------------------------------------------------------------------------------------
 
-	private static class TimeAttributeChecker extends LogicalTypeDefaultVisitor<TimestampKind> {
+	/**
+	 * Extracts an attribute of logical types that define that attribute.
+	 */
+	private static class Extractor<T> extends LogicalTypeDefaultVisitor<T> {
+		@Override
+		protected T defaultMethod(LogicalType logicalType) {
+			throw new IllegalArgumentException(
+				String.format(
+					"Invalid use of extractor %s. Called on logical type: %s",
+					this.getClass().getName(),
+					logicalType));
+		}
+	}
+
+	private static class LengthExtractor extends Extractor<Integer> {
+
+		@Override
+		public Integer visit(CharType charType) {
+			return charType.getLength();
+		}
+
+		@Override
+		public Integer visit(VarCharType varCharType) {
+			return varCharType.getLength();
+		}
+
+		@Override
+		public Integer visit(BinaryType binaryType) {
+			return binaryType.getLength();
+		}
+
+		@Override
+		public Integer visit(VarBinaryType varBinaryType) {
+			return varBinaryType.getLength();
+		}
+	}
+
+	private static class PrecisionExtractor extends Extractor<Integer> {
+
+		@Override
+		public Integer visit(DecimalType decimalType) {
+			return decimalType.getPrecision();
+		}
+
+		@Override
+		public Integer visit(TimeType timeType) {
+			return timeType.getPrecision();
+		}
+
+		@Override
+		public Integer visit(TimestampType timestampType) {
+			return timestampType.getPrecision();
+		}
+
+		@Override
+		public Integer visit(ZonedTimestampType zonedTimestampType) {
+			return zonedTimestampType.getPrecision();
+		}
+
+		@Override
+		public Integer visit(LocalZonedTimestampType localZonedTimestampType) {
+			return localZonedTimestampType.getPrecision();
+		}
+	}
+
+	private static class ScaleExtractor extends Extractor<Integer> {
+
+		@Override
+		public Integer visit(DecimalType decimalType) {
+			return decimalType.getScale();
+		}
+	}
+
+	private static class YearPrecisionExtractor extends Extractor<Integer> {
+
+		@Override
+		public Integer visit(YearMonthIntervalType yearMonthIntervalType) {
+			return yearMonthIntervalType.getYearPrecision();
+		}
+	}
+
+	private static class DayPrecisionExtractor extends Extractor<Integer> {
+
+		@Override
+		public Integer visit(DayTimeIntervalType dayTimeIntervalType) {
+			return dayTimeIntervalType.getDayPrecision();
+		}
+	}
+
+	private static class FractionalPrecisionExtractor extends Extractor<Integer> {
+
+		@Override
+		public Integer visit(DayTimeIntervalType dayTimeIntervalType) {
+			return dayTimeIntervalType.getFractionalPrecision();
+		}
+	}
+
+	private static class TimestampKindExtractor extends Extractor<TimestampKind>  {
 
 		@Override
 		public TimestampKind visit(TimestampType timestampType) {
@@ -77,11 +242,5 @@ public final class LogicalTypeChecks {
 		public TimestampKind visit(LocalZonedTimestampType localZonedTimestampType) {
 			return localZonedTimestampType.getKind();
 		}
-
-		@Override
-		protected TimestampKind defaultMethod(LogicalType logicalType) {
-			// we don't verify that type is actually a timestamp
-			return TimestampKind.REGULAR;
-		}
 	}
 }


[flink] 02/05: [hotfix][table-common] Add missing getter to TimeType

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1b525fdd83d9fe6090d07b170370f7900d2367d5
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon May 27 11:47:55 2019 +0200

    [hotfix][table-common] Add missing getter to TimeType
---
 .../src/main/java/org/apache/flink/table/types/logical/TimeType.java  | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TimeType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TimeType.java
index f55ddf3..15763d3 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TimeType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TimeType.java
@@ -89,6 +89,10 @@ public final class TimeType extends LogicalType {
 		this(DEFAULT_PRECISION);
 	}
 
+	public int getPrecision() {
+		return precision;
+	}
+
 	@Override
 	public LogicalType copy(boolean isNullable) {
 		return new TimeType(isNullable, precision);


[flink] 05/05: [hotfix][table-common] Fix invalid class to data type conversion

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1b9fb2b6d07d54848e57d42333382a4553852491
Author: Timo Walther <tw...@apache.org>
AuthorDate: Tue May 28 16:21:30 2019 +0200

    [hotfix][table-common] Fix invalid class to data type conversion
---
 .../java/org/apache/flink/table/types/utils/ClassDataTypeConverter.java | 2 +-
 .../java/org/apache/flink/table/types/ClassDataTypeConverterTest.java   | 2 ++
 2 files changed, 3 insertions(+), 1 deletion(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/ClassDataTypeConverter.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/ClassDataTypeConverter.java
index a71c682..7001168 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/ClassDataTypeConverter.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/ClassDataTypeConverter.java
@@ -56,7 +56,7 @@ public final class ClassDataTypeConverter {
 		addDefaultDataType(double.class, DataTypes.DOUBLE());
 		addDefaultDataType(java.sql.Date.class, DataTypes.DATE());
 		addDefaultDataType(java.time.LocalDate.class, DataTypes.DATE());
-		addDefaultDataType(java.sql.Time.class, DataTypes.TIME(3));
+		addDefaultDataType(java.sql.Time.class, DataTypes.TIME(0));
 		addDefaultDataType(java.time.LocalTime.class, DataTypes.TIME(9));
 		addDefaultDataType(java.sql.Timestamp.class, DataTypes.TIMESTAMP(9));
 		addDefaultDataType(java.time.LocalDateTime.class, DataTypes.TIMESTAMP(9));
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/ClassDataTypeConverterTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/ClassDataTypeConverterTest.java
index d889dec..46781cb 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/ClassDataTypeConverterTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/ClassDataTypeConverterTest.java
@@ -52,6 +52,8 @@ public class ClassDataTypeConverterTest {
 
 				{Long.class, DataTypes.BIGINT().nullable().bridgedTo(Long.class)},
 
+				{java.sql.Time.class, DataTypes.TIME(0).nullable().bridgedTo(java.sql.Time.class)},
+
 				{BigDecimal.class, null},
 
 				{


[flink] 01/05: [hotfix][table-common] Update CHAR and BINARY in accordance with the SQL standard

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5b91db9fe01066980be78df9e8e8f177c8bc6543
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon May 27 11:47:13 2019 +0200

    [hotfix][table-common] Update CHAR and BINARY in accordance with the SQL standard
---
 .../java/org/apache/flink/table/api/DataTypes.java |  2 +-
 .../flink/table/types/logical/BinaryType.java      | 26 ++++++++++++++++-
 .../apache/flink/table/types/logical/CharType.java | 33 ++++++++++++++++++----
 .../apache/flink/table/types/LogicalTypesTest.java |  2 +-
 4 files changed, 55 insertions(+), 8 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java
index dc7c319..288a2f6 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java
@@ -81,7 +81,7 @@ public final class DataTypes {
 
 	/**
 	 * Data type of a fixed-length character string {@code CHAR(n)} where {@code n} is the number
-	 * of code points. {@code n} must have a value between 1 and 255 (both inclusive).
+	 * of code points. {@code n} must have a value between 1 and {@link Integer#MAX_VALUE} (both inclusive).
 	 *
 	 * @see CharType
 	 */
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/BinaryType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/BinaryType.java
index e342d9e..25dbcd4 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/BinaryType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/BinaryType.java
@@ -32,10 +32,15 @@ import java.util.Set;
  * <p>The serialized string representation is {@code BINARY(n)} where {@code n} is the number of
  * bytes. {@code n} must have a value between 1 and {@link Integer#MAX_VALUE} (both inclusive). If
  * no length is specified, {@code n} is equal to 1.
+ *
+ * <p>For expressing a zero-length binary string literal, this type does also support {@code n} to
+ * be 0. However, this is not exposed through the API.
  */
 @PublicEvolving
 public final class BinaryType extends LogicalType {
 
+	public static final int EMPTY_LITERAL_LENGTH = 0;
+
 	public static final int MIN_LENGTH = 1;
 
 	public static final int MAX_LENGTH = Integer.MAX_VALUE;
@@ -72,13 +77,32 @@ public final class BinaryType extends LogicalType {
 		this(DEFAULT_LENGTH);
 	}
 
+	/**
+	 * Helper constructor for {@link #ofEmptyLiteral()} and {@link #copy(boolean)}.
+	 */
+	private BinaryType(int length, boolean isNullable) {
+		super(isNullable, LogicalTypeRoot.BINARY);
+		this.length = length;
+	}
+
+	/**
+	 * The SQL standard defines that character string literals are allowed to be zero-length strings
+	 * (i.e., to contain no characters) even though it is not permitted to declare a type that is zero.
+	 * For consistent behavior, the same logic applies to binary strings.
+	 *
+	 * <p>This method enables this special kind of binary string.
+	 */
+	public static BinaryType ofEmptyLiteral() {
+		return new BinaryType(EMPTY_LITERAL_LENGTH, false);
+	}
+
 	public int getLength() {
 		return length;
 	}
 
 	@Override
 	public LogicalType copy(boolean isNullable) {
-		return new BinaryType(isNullable, length);
+		return new BinaryType(length, isNullable);
 	}
 
 	@Override
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/CharType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/CharType.java
index fe870ce..8385d05 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/CharType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/CharType.java
@@ -30,17 +30,22 @@ import java.util.Set;
  * Logical type of a fixed-length character string.
  *
  * <p>The serialized string representation is {@code CHAR(n)} where {@code n} is the number of
- * code points. {@code n} must have a value between 1 and 255 (both inclusive). If no length is
- * specified, {@code n} is equal to 1.
+ * code points. {@code n} must have a value between 1 and {@link Integer#MAX_VALUE} (both inclusive).
+ * If no length is specified, {@code n} is equal to 1.
+ *
+ * <p>For expressing a zero-length character string literal, this type does also support {@code n}
+ * to be 0. However, this is not exposed through the API.
  *
  * <p>A conversion from and to {@code byte[]} assumes UTF-8 encoding.
  */
 @PublicEvolving
 public final class CharType extends LogicalType {
 
+	public static final int EMPTY_LITERAL_LENGTH = 0;
+
 	public static final int MIN_LENGTH = 1;
 
-	public static final int MAX_LENGTH = 255;
+	public static final int MAX_LENGTH = Integer.MAX_VALUE;
 
 	public static final int DEFAULT_LENGTH = 1;
 
@@ -57,7 +62,7 @@ public final class CharType extends LogicalType {
 
 	public CharType(boolean isNullable, int length) {
 		super(isNullable, LogicalTypeRoot.CHAR);
-		if (length < MIN_LENGTH || length > MAX_LENGTH) {
+		if (length < MIN_LENGTH) {
 			throw new ValidationException(
 				String.format(
 					"Character string length must be between %d and %d (both inclusive).",
@@ -75,13 +80,31 @@ public final class CharType extends LogicalType {
 		this(DEFAULT_LENGTH);
 	}
 
+	/**
+	 * Helper constructor for {@link #ofEmptyLiteral()} and {@link #copy(boolean)}.
+	 */
+	private CharType(int length, boolean isNullable) {
+		super(isNullable, LogicalTypeRoot.CHAR);
+		this.length = length;
+	}
+
+	/**
+	 * The SQL standard defines that character string literals are allowed to be zero-length strings
+	 * (i.e., to contain no characters) even though it is not permitted to declare a type that is zero.
+	 *
+	 * <p>This method enables this special kind of character string.
+	 */
+	public static CharType ofEmptyLiteral() {
+		return new CharType(EMPTY_LITERAL_LENGTH, false);
+	}
+
 	public int getLength() {
 		return length;
 	}
 
 	@Override
 	public LogicalType copy(boolean isNullable) {
-		return new CharType(isNullable, length);
+		return new CharType(length, isNullable);
 	}
 
 	@Override
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
index 853d37f..600b718 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
@@ -86,7 +86,7 @@ public class LogicalTypesTest {
 			new Class[]{String.class, byte[].class},
 			new Class[]{String.class, byte[].class},
 			new LogicalType[]{},
-			new CharType(12)
+			new CharType(Integer.MAX_VALUE)
 		);
 	}