You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/05/29 06:19:34 UTC

[flink] 04/05: [hotfix][table-common] Add a value to data type converter

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 551b8ef507109218a428baac2d5682fec2381a4f
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon May 27 12:45:13 2019 +0200

    [hotfix][table-common] Add a value to data type converter
---
 .../table/types/utils/ValueDataTypeConverter.java  | 231 +++++++++++++++++++++
 .../table/types/ValueDataTypeConverterTest.java    | 160 ++++++++++++++
 2 files changed, 391 insertions(+)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/ValueDataTypeConverter.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/ValueDataTypeConverter.java
new file mode 100644
index 0000000..3b311ce
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/ValueDataTypeConverter.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+
+import java.math.BigDecimal;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+/**
+ * Value-based data type extractor that supports extraction of clearly identifiable data types for
+ * input conversion.
+ *
+ * <p>This converter is more precise than {@link ClassDataTypeConverter} because it also considers
+ * nullability, length, precision, and scale of values.
+ */
+@Internal
+public final class ValueDataTypeConverter {
+
+	/**
+	 * Returns the clearly identifiable data type if possible. For example, {@code 12L} can be
+	 * expressed as {@code DataTypes.BIGINT().notNull()}. However, for example, {@code null} could
+	 * be any type and is not supported.
+	 *
+	 * <p>All types of the {@link LogicalTypeFamily#PREDEFINED} family and arrays are supported.
+	 */
+	public static Optional<DataType> extractDataType(Object value) {
+		if (value == null) {
+			return Optional.empty();
+		}
+
+		DataType convertedDataType = null;
+
+		if (value instanceof String) {
+			convertedDataType = convertToCharType((String) value);
+		}
+
+		// byte arrays have higher priority than regular arrays
+		else if (value instanceof byte[]) {
+			convertedDataType = convertToBinaryType((byte[]) value);
+		}
+
+		else if (value instanceof BigDecimal) {
+			convertedDataType = convertToDecimalType((BigDecimal) value);
+		}
+
+		else if (value instanceof java.time.LocalTime) {
+			convertedDataType = convertToTimeType((java.time.LocalTime) value);
+		}
+
+		else if (value instanceof java.time.LocalDateTime) {
+			convertedDataType = convertToTimestampType(((java.time.LocalDateTime) value).getNano());
+		}
+
+		else if (value instanceof java.sql.Timestamp) {
+			convertedDataType = convertToTimestampType(((java.sql.Timestamp) value).getNanos());
+		}
+
+		else if (value instanceof java.time.ZonedDateTime) {
+			convertedDataType = convertToZonedTimestampType(((java.time.ZonedDateTime) value).getNano());
+		}
+
+		else if (value instanceof java.time.OffsetDateTime) {
+			convertedDataType = convertToZonedTimestampType(((java.time.OffsetDateTime) value).getNano());
+		}
+
+		else if (value instanceof java.time.Instant) {
+			convertedDataType = convertToLocalZonedTimestampType(((java.time.Instant) value).getNano());
+		}
+
+		else if (value instanceof java.time.Period) {
+			convertedDataType = convertToYearMonthIntervalType(((java.time.Period) value).getYears());
+		}
+
+		else if (value instanceof java.time.Duration) {
+			final java.time.Duration duration = (java.time.Duration) value;
+			convertedDataType = convertToDayTimeIntervalType(duration.toDays(), duration.getNano());
+		}
+
+		else if (value instanceof Object[]) {
+			// don't let the class-based extraction kick in if array elements differ
+			return convertToArrayType((Object[]) value)
+				.map(dt -> dt.notNull().bridgedTo(value.getClass()));
+		}
+
+		final Optional<DataType> resultType;
+		if (convertedDataType != null) {
+			resultType = Optional.of(convertedDataType);
+		} else {
+			// class-based extraction is possible for BOOLEAN, TINYINT, SMALLINT, INT, FLOAT, DOUBLE,
+			// DATE, TIME with java.sql.Time, and arrays of primitive types
+			resultType = ClassDataTypeConverter.extractDataType(value.getClass());
+		}
+		return resultType.map(dt -> dt.notNull().bridgedTo(value.getClass()));
+	}
+
+	private static DataType convertToCharType(String string) {
+		if (string.isEmpty()) {
+			return new AtomicDataType(CharType.ofEmptyLiteral());
+		}
+		return DataTypes.CHAR(string.length());
+	}
+
+	private static DataType convertToBinaryType(byte[] bytes) {
+		if (bytes.length == 0) {
+			return new AtomicDataType(BinaryType.ofEmptyLiteral());
+		}
+		return DataTypes.BINARY(bytes.length);
+	}
+
+	private static DataType convertToDecimalType(BigDecimal decimal) {
+		// let underlying layers check if precision and scale are supported
+		return DataTypes.DECIMAL(decimal.precision(), decimal.scale());
+	}
+
+	private static DataType convertToTimeType(java.time.LocalTime time) {
+		return DataTypes.TIME(fractionalSecondPrecision(time.getNano()));
+	}
+
+	private static DataType convertToTimestampType(int nanos) {
+		return DataTypes.TIMESTAMP(fractionalSecondPrecision(nanos));
+	}
+
+	private static DataType convertToZonedTimestampType(int nanos) {
+		return DataTypes.TIMESTAMP_WITH_TIME_ZONE(fractionalSecondPrecision(nanos));
+	}
+
+	private static DataType convertToLocalZonedTimestampType(int nanos) {
+		return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(fractionalSecondPrecision(nanos));
+	}
+
+	private static DataType convertToYearMonthIntervalType(int years) {
+		return DataTypes.INTERVAL(DataTypes.YEAR(yearPrecision(years)), DataTypes.MONTH());
+	}
+
+	private static DataType convertToDayTimeIntervalType(long days, int nanos) {
+		return DataTypes.INTERVAL(
+			DataTypes.DAY(dayPrecision(days)),
+			DataTypes.SECOND(fractionalSecondPrecision(nanos)));
+	}
+
+	private static Optional<DataType> convertToArrayType(Object[] array) {
+		// fallback to class based-extraction if no values exist
+		if (array.length == 0 || Stream.of(array).allMatch(Objects::isNull)) {
+			return extractElementTypeFromClass(array);
+		}
+
+		return extractElementTypeFromValues(array);
+	}
+
+	private static Optional<DataType> extractElementTypeFromValues(Object[] array) {
+		DataType elementType = null;
+		for (Object element : array) {
+			// null values are wildcard array elements
+			if (element == null) {
+				continue;
+			}
+
+			final Optional<DataType> possibleElementType = extractDataType(element);
+			if (!possibleElementType.isPresent()) {
+				return Optional.empty();
+			}
+
+			// for simplification, we assume that array elements can always be nullable
+			// otherwise mismatches could occur when dealing with nested arrays
+			final DataType extractedElementType = possibleElementType.get().nullable();
+
+			// ensure that all elements have the same type;
+			// in theory the logic could be improved by converting an array with elements
+			// [CHAR(1), CHAR(2)] into an array of CHAR(2) but this can lead to value
+			// modification (i.e. adding spaces) which is not intended.
+			if (elementType != null && !extractedElementType.equals(elementType)) {
+				return Optional.empty();
+			}
+			elementType = extractedElementType;
+		}
+
+		return Optional.ofNullable(elementType)
+			.map(DataTypes::ARRAY);
+	}
+
+	private static Optional<DataType> extractElementTypeFromClass(Object[] array) {
+		final Optional<DataType> possibleElementType =
+			ClassDataTypeConverter.extractDataType(array.getClass().getComponentType());
+
+		// for simplification, we assume that array elements can always be nullable
+		return possibleElementType
+			.map(DataType::nullable)
+			.map(DataTypes::ARRAY);
+	}
+
+	private static int fractionalSecondPrecision(int nanos) {
+		return String.format("%09d", nanos).replaceAll("0+$", "").length();
+	}
+
+	private static int yearPrecision(int years) {
+		return String.valueOf(years).length();
+	}
+
+	private static int dayPrecision(long days) {
+		return String.valueOf(days).length();
+	}
+
+	private ValueDataTypeConverter() {
+		// no instantiation
+	}
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/ValueDataTypeConverterTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/ValueDataTypeConverterTest.java
new file mode 100644
index 0000000..9c00181
--- /dev/null
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/ValueDataTypeConverterTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.utils.ValueDataTypeConverter;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.annotation.Nullable;
+
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.Period;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link ValueDataTypeConverter}.
+ */
+@RunWith(Parameterized.class)
+public class ValueDataTypeConverterTest {
+
+	@Parameterized.Parameters(name = "[{index}] value: {0} type: {1}")
+	public static List<Object[]> testData() {
+		return Arrays.asList(
+			new Object[][]{
+
+				{"Hello World", DataTypes.CHAR(11)},
+
+				{"", new AtomicDataType(CharType.ofEmptyLiteral())},
+
+				{new byte[]{1, 2, 3}, DataTypes.BINARY(3)},
+
+				{new byte[0], new AtomicDataType(BinaryType.ofEmptyLiteral())},
+
+				{BigDecimal.ZERO, DataTypes.DECIMAL(1, 0)},
+
+				{new BigDecimal("12.123"), DataTypes.DECIMAL(5, 3)},
+
+				{12, DataTypes.INT()},
+
+				{LocalTime.of(13, 24, 25, 1000), DataTypes.TIME(6)},
+
+				{LocalTime.of(13, 24, 25, 0), DataTypes.TIME(0)},
+
+				{LocalTime.of(13, 24, 25, 1), DataTypes.TIME(9)},
+
+				{LocalTime.of(13, 24, 25, 999_999_999), DataTypes.TIME(9)},
+
+				{LocalDateTime.of(2019, 11, 11, 13, 24, 25, 1001), DataTypes.TIMESTAMP(9)},
+
+				{
+					ZonedDateTime.of(2019, 11, 11, 13, 24, 25, 1001, ZoneId.systemDefault()),
+					DataTypes.TIMESTAMP_WITH_TIME_ZONE(9).bridgedTo(ZonedDateTime.class)
+				},
+
+				{
+					OffsetDateTime.of(2019, 11, 11, 13, 24, 25, 1001, ZoneOffset.UTC),
+					DataTypes.TIMESTAMP_WITH_TIME_ZONE(9).bridgedTo(OffsetDateTime.class)
+				},
+
+				{
+					Instant.ofEpochMilli(12345602021L),
+					DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).bridgedTo(Instant.class)
+				},
+
+				{
+					Period.ofYears(1000),
+					DataTypes.INTERVAL(DataTypes.YEAR(4), DataTypes.MONTH()).bridgedTo(Period.class)
+				},
+
+				{
+					Duration.ofMillis(1100),
+					DataTypes.INTERVAL(DataTypes.DAY(1), DataTypes.SECOND(1)).bridgedTo(Duration.class)
+				},
+
+				{
+					Duration.ofDays(42),
+					DataTypes.INTERVAL(DataTypes.DAY(2), DataTypes.SECOND(0)).bridgedTo(Duration.class)
+				},
+
+				{
+					Timestamp.valueOf("2018-01-01 12:13:14.123"),
+					DataTypes.TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class)
+				},
+
+				{new Integer[]{1, 2, 3}, DataTypes.ARRAY(DataTypes.INT())},
+
+				{new Integer[]{1, null, 3}, DataTypes.ARRAY(DataTypes.INT())},
+
+				{
+					new BigDecimal[]{new BigDecimal("12.1234"), new BigDecimal("42.4321"), new BigDecimal("20.0000")},
+					DataTypes.ARRAY(DataTypes.DECIMAL(6, 4))
+				},
+
+				{
+					new BigDecimal[]{null, new BigDecimal("42.4321")},
+					DataTypes.ARRAY(DataTypes.DECIMAL(6, 4))
+				},
+
+				{new Integer[0], DataTypes.ARRAY(DataTypes.INT())},
+
+				{
+					new Integer[][]{new Integer[]{1, null, 3}, new Integer[0], new Integer[]{1}},
+					DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT()))
+				},
+
+				{
+					new BigDecimal[0],
+					null
+				},
+			}
+		);
+	}
+
+	@Parameterized.Parameter
+	public Object value;
+
+	@Parameterized.Parameter(1)
+	public @Nullable DataType dataType;
+
+	@Test
+	public void testClassToDataTypeConversion() {
+		assertEquals(
+			Optional.ofNullable(dataType).map(DataType::notNull),
+			ValueDataTypeConverter.extractDataType(value));
+	}
+}