You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/05/29 06:19:34 UTC
[flink] 04/05: [hotfix][table-common] Add a value to data type
converter
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 551b8ef507109218a428baac2d5682fec2381a4f
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon May 27 12:45:13 2019 +0200
[hotfix][table-common] Add a value to data type converter
---
.../table/types/utils/ValueDataTypeConverter.java | 231 +++++++++++++++++++++
.../table/types/ValueDataTypeConverterTest.java | 160 ++++++++++++++
2 files changed, 391 insertions(+)
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/ValueDataTypeConverter.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/ValueDataTypeConverter.java
new file mode 100644
index 0000000..3b311ce
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/ValueDataTypeConverter.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+
+import java.math.BigDecimal;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+/**
+ * Value-based data type extractor that supports extraction of clearly identifiable data types for
+ * input conversion.
+ *
+ * <p>This converter is more precise than {@link ClassDataTypeConverter} because it also considers
+ * nullability, length, precision, and scale of values.
+ */
+@Internal
+public final class ValueDataTypeConverter {
+
+ /**
+ * Returns the clearly identifiable data type if possible. For example, {@code 12L} can be
+ * expressed as {@code DataTypes.BIGINT().notNull()}. However, for example, {@code null} could
+ * be any type and is not supported.
+ *
+ * <p>All types of the {@link LogicalTypeFamily#PREDEFINED} family and arrays are supported.
+ */
+ public static Optional<DataType> extractDataType(Object value) {
+ if (value == null) {
+ return Optional.empty();
+ }
+
+ DataType convertedDataType = null;
+
+ if (value instanceof String) {
+ convertedDataType = convertToCharType((String) value);
+ }
+
+ // byte arrays have higher priority than regular arrays
+ else if (value instanceof byte[]) {
+ convertedDataType = convertToBinaryType((byte[]) value);
+ }
+
+ else if (value instanceof BigDecimal) {
+ convertedDataType = convertToDecimalType((BigDecimal) value);
+ }
+
+ else if (value instanceof java.time.LocalTime) {
+ convertedDataType = convertToTimeType((java.time.LocalTime) value);
+ }
+
+ else if (value instanceof java.time.LocalDateTime) {
+ convertedDataType = convertToTimestampType(((java.time.LocalDateTime) value).getNano());
+ }
+
+ else if (value instanceof java.sql.Timestamp) {
+ convertedDataType = convertToTimestampType(((java.sql.Timestamp) value).getNanos());
+ }
+
+ else if (value instanceof java.time.ZonedDateTime) {
+ convertedDataType = convertToZonedTimestampType(((java.time.ZonedDateTime) value).getNano());
+ }
+
+ else if (value instanceof java.time.OffsetDateTime) {
+ convertedDataType = convertToZonedTimestampType(((java.time.OffsetDateTime) value).getNano());
+ }
+
+ else if (value instanceof java.time.Instant) {
+ convertedDataType = convertToLocalZonedTimestampType(((java.time.Instant) value).getNano());
+ }
+
+ else if (value instanceof java.time.Period) {
+ convertedDataType = convertToYearMonthIntervalType(((java.time.Period) value).getYears());
+ }
+
+ else if (value instanceof java.time.Duration) {
+ final java.time.Duration duration = (java.time.Duration) value;
+ convertedDataType = convertToDayTimeIntervalType(duration.toDays(), duration.getNano());
+ }
+
+ else if (value instanceof Object[]) {
+ // don't let the class-based extraction kick in if array elements differ
+ return convertToArrayType((Object[]) value)
+ .map(dt -> dt.notNull().bridgedTo(value.getClass()));
+ }
+
+ final Optional<DataType> resultType;
+ if (convertedDataType != null) {
+ resultType = Optional.of(convertedDataType);
+ } else {
+ // class-based extraction is possible for BOOLEAN, TINYINT, SMALLINT, INT, FLOAT, DOUBLE,
+ // DATE, TIME with java.sql.Time, and arrays of primitive types
+ resultType = ClassDataTypeConverter.extractDataType(value.getClass());
+ }
+ return resultType.map(dt -> dt.notNull().bridgedTo(value.getClass()));
+ }
+
+ private static DataType convertToCharType(String string) {
+ if (string.isEmpty()) {
+ return new AtomicDataType(CharType.ofEmptyLiteral());
+ }
+ return DataTypes.CHAR(string.length());
+ }
+
+ private static DataType convertToBinaryType(byte[] bytes) {
+ if (bytes.length == 0) {
+ return new AtomicDataType(BinaryType.ofEmptyLiteral());
+ }
+ return DataTypes.BINARY(bytes.length);
+ }
+
+ private static DataType convertToDecimalType(BigDecimal decimal) {
+ // let underlying layers check if precision and scale are supported
+ return DataTypes.DECIMAL(decimal.precision(), decimal.scale());
+ }
+
+ private static DataType convertToTimeType(java.time.LocalTime time) {
+ return DataTypes.TIME(fractionalSecondPrecision(time.getNano()));
+ }
+
+ private static DataType convertToTimestampType(int nanos) {
+ return DataTypes.TIMESTAMP(fractionalSecondPrecision(nanos));
+ }
+
+ private static DataType convertToZonedTimestampType(int nanos) {
+ return DataTypes.TIMESTAMP_WITH_TIME_ZONE(fractionalSecondPrecision(nanos));
+ }
+
+ private static DataType convertToLocalZonedTimestampType(int nanos) {
+ return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(fractionalSecondPrecision(nanos));
+ }
+
+ private static DataType convertToYearMonthIntervalType(int years) {
+ return DataTypes.INTERVAL(DataTypes.YEAR(yearPrecision(years)), DataTypes.MONTH());
+ }
+
+ private static DataType convertToDayTimeIntervalType(long days, int nanos) {
+ return DataTypes.INTERVAL(
+ DataTypes.DAY(dayPrecision(days)),
+ DataTypes.SECOND(fractionalSecondPrecision(nanos)));
+ }
+
+ private static Optional<DataType> convertToArrayType(Object[] array) {
+ // fallback to class based-extraction if no values exist
+ if (array.length == 0 || Stream.of(array).allMatch(Objects::isNull)) {
+ return extractElementTypeFromClass(array);
+ }
+
+ return extractElementTypeFromValues(array);
+ }
+
+ private static Optional<DataType> extractElementTypeFromValues(Object[] array) {
+ DataType elementType = null;
+ for (Object element : array) {
+ // null values are wildcard array elements
+ if (element == null) {
+ continue;
+ }
+
+ final Optional<DataType> possibleElementType = extractDataType(element);
+ if (!possibleElementType.isPresent()) {
+ return Optional.empty();
+ }
+
+ // for simplification, we assume that array elements can always be nullable
+ // otherwise mismatches could occur when dealing with nested arrays
+ final DataType extractedElementType = possibleElementType.get().nullable();
+
+ // ensure that all elements have the same type;
+ // in theory the logic could be improved by converting an array with elements
+ // [CHAR(1), CHAR(2)] into an array of CHAR(2) but this can lead to value
+ // modification (i.e. adding spaces) which is not intended.
+ if (elementType != null && !extractedElementType.equals(elementType)) {
+ return Optional.empty();
+ }
+ elementType = extractedElementType;
+ }
+
+ return Optional.ofNullable(elementType)
+ .map(DataTypes::ARRAY);
+ }
+
+ private static Optional<DataType> extractElementTypeFromClass(Object[] array) {
+ final Optional<DataType> possibleElementType =
+ ClassDataTypeConverter.extractDataType(array.getClass().getComponentType());
+
+ // for simplification, we assume that array elements can always be nullable
+ return possibleElementType
+ .map(DataType::nullable)
+ .map(DataTypes::ARRAY);
+ }
+
+ private static int fractionalSecondPrecision(int nanos) {
+ return String.format("%09d", nanos).replaceAll("0+$", "").length();
+ }
+
+ private static int yearPrecision(int years) {
+ return String.valueOf(years).length();
+ }
+
+ private static int dayPrecision(long days) {
+ return String.valueOf(days).length();
+ }
+
+ private ValueDataTypeConverter() {
+ // no instantiation
+ }
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/ValueDataTypeConverterTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/ValueDataTypeConverterTest.java
new file mode 100644
index 0000000..9c00181
--- /dev/null
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/ValueDataTypeConverterTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.utils.ValueDataTypeConverter;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.annotation.Nullable;
+
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.Period;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link ValueDataTypeConverter}.
+ */
+@RunWith(Parameterized.class)
+public class ValueDataTypeConverterTest {
+
+ @Parameterized.Parameters(name = "[{index}] value: {0} type: {1}")
+ public static List<Object[]> testData() {
+ return Arrays.asList(
+ new Object[][]{
+
+ {"Hello World", DataTypes.CHAR(11)},
+
+ {"", new AtomicDataType(CharType.ofEmptyLiteral())},
+
+ {new byte[]{1, 2, 3}, DataTypes.BINARY(3)},
+
+ {new byte[0], new AtomicDataType(BinaryType.ofEmptyLiteral())},
+
+ {BigDecimal.ZERO, DataTypes.DECIMAL(1, 0)},
+
+ {new BigDecimal("12.123"), DataTypes.DECIMAL(5, 3)},
+
+ {12, DataTypes.INT()},
+
+ {LocalTime.of(13, 24, 25, 1000), DataTypes.TIME(6)},
+
+ {LocalTime.of(13, 24, 25, 0), DataTypes.TIME(0)},
+
+ {LocalTime.of(13, 24, 25, 1), DataTypes.TIME(9)},
+
+ {LocalTime.of(13, 24, 25, 999_999_999), DataTypes.TIME(9)},
+
+ {LocalDateTime.of(2019, 11, 11, 13, 24, 25, 1001), DataTypes.TIMESTAMP(9)},
+
+ {
+ ZonedDateTime.of(2019, 11, 11, 13, 24, 25, 1001, ZoneId.systemDefault()),
+ DataTypes.TIMESTAMP_WITH_TIME_ZONE(9).bridgedTo(ZonedDateTime.class)
+ },
+
+ {
+ OffsetDateTime.of(2019, 11, 11, 13, 24, 25, 1001, ZoneOffset.UTC),
+ DataTypes.TIMESTAMP_WITH_TIME_ZONE(9).bridgedTo(OffsetDateTime.class)
+ },
+
+ {
+ Instant.ofEpochMilli(12345602021L),
+ DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).bridgedTo(Instant.class)
+ },
+
+ {
+ Period.ofYears(1000),
+ DataTypes.INTERVAL(DataTypes.YEAR(4), DataTypes.MONTH()).bridgedTo(Period.class)
+ },
+
+ {
+ Duration.ofMillis(1100),
+ DataTypes.INTERVAL(DataTypes.DAY(1), DataTypes.SECOND(1)).bridgedTo(Duration.class)
+ },
+
+ {
+ Duration.ofDays(42),
+ DataTypes.INTERVAL(DataTypes.DAY(2), DataTypes.SECOND(0)).bridgedTo(Duration.class)
+ },
+
+ {
+ Timestamp.valueOf("2018-01-01 12:13:14.123"),
+ DataTypes.TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class)
+ },
+
+ {new Integer[]{1, 2, 3}, DataTypes.ARRAY(DataTypes.INT())},
+
+ {new Integer[]{1, null, 3}, DataTypes.ARRAY(DataTypes.INT())},
+
+ {
+ new BigDecimal[]{new BigDecimal("12.1234"), new BigDecimal("42.4321"), new BigDecimal("20.0000")},
+ DataTypes.ARRAY(DataTypes.DECIMAL(6, 4))
+ },
+
+ {
+ new BigDecimal[]{null, new BigDecimal("42.4321")},
+ DataTypes.ARRAY(DataTypes.DECIMAL(6, 4))
+ },
+
+ {new Integer[0], DataTypes.ARRAY(DataTypes.INT())},
+
+ {
+ new Integer[][]{new Integer[]{1, null, 3}, new Integer[0], new Integer[]{1}},
+ DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT()))
+ },
+
+ {
+ new BigDecimal[0],
+ null
+ },
+ }
+ );
+ }
+
+ @Parameterized.Parameter
+ public Object value;
+
+ @Parameterized.Parameter(1)
+ public @Nullable DataType dataType;
+
+ @Test
+ public void testClassToDataTypeConversion() {
+ assertEquals(
+ Optional.ofNullable(dataType).map(DataType::notNull),
+ ValueDataTypeConverter.extractDataType(value));
+ }
+}