You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/05/16 09:09:32 UTC
[flink] branch master updated: [FLINK-12254][table-common] Add a
logical type to data type converter
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 72bf740 [FLINK-12254][table-common] Add a logical type to data type converter
72bf740 is described below
commit 72bf7408d0e26518eaec22cc68e2be7baad2252a
Author: Timo Walther <tw...@apache.org>
AuthorDate: Wed May 15 13:14:45 2019 +0200
[FLINK-12254][table-common] Add a logical type to data type converter
This closes #8453.
---
.../types/utils/LogicalTypeDataTypeConverter.java | 248 +++++++++++++++++++++
.../apache/flink/table/types/DataTypesTest.java | 16 +-
2 files changed, 263 insertions(+), 1 deletion(-)
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/LogicalTypeDataTypeConverter.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/LogicalTypeDataTypeConverter.java
new file mode 100644
index 0000000..eb1c813
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/LogicalTypeDataTypeConverter.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.utils;
+
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.FieldsDataType;
+import org.apache.flink.table.types.KeyValueDataType;
+import org.apache.flink.table.types.logical.AnyType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeVisitor;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.StructuredType.StructuredAttribute;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * A converter between {@link LogicalType} and {@link DataType}.
+ */
+public final class LogicalTypeDataTypeConverter {
+
+ private static final DefaultDataTypeCreator dataTypeCreator = new DefaultDataTypeCreator();
+
+ /**
+ * Returns the data type of a logical type without explicit conversions.
+ */
+ public static DataType toDataType(LogicalType logicalType) {
+ return logicalType.accept(dataTypeCreator);
+ }
+
+ /**
+ * Returns the logical type of a data type.
+ */
+ public static LogicalType toLogicalType(DataType dataType) {
+ return dataType.getLogicalType();
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private static class DefaultDataTypeCreator implements LogicalTypeVisitor<DataType> {
+
+ @Override
+ public DataType visit(CharType charType) {
+ return new AtomicDataType(charType);
+ }
+
+ @Override
+ public DataType visit(VarCharType varCharType) {
+ return new AtomicDataType(varCharType);
+ }
+
+ @Override
+ public DataType visit(BooleanType booleanType) {
+ return new AtomicDataType(booleanType);
+ }
+
+ @Override
+ public DataType visit(BinaryType binaryType) {
+ return new AtomicDataType(binaryType);
+ }
+
+ @Override
+ public DataType visit(VarBinaryType varBinaryType) {
+ return new AtomicDataType(varBinaryType);
+ }
+
+ @Override
+ public DataType visit(DecimalType decimalType) {
+ return new AtomicDataType(decimalType);
+ }
+
+ @Override
+ public DataType visit(TinyIntType tinyIntType) {
+ return new AtomicDataType(tinyIntType);
+ }
+
+ @Override
+ public DataType visit(SmallIntType smallIntType) {
+ return new AtomicDataType(smallIntType);
+ }
+
+ @Override
+ public DataType visit(IntType intType) {
+ return new AtomicDataType(intType);
+ }
+
+ @Override
+ public DataType visit(BigIntType bigIntType) {
+ return new AtomicDataType(bigIntType);
+ }
+
+ @Override
+ public DataType visit(FloatType floatType) {
+ return new AtomicDataType(floatType);
+ }
+
+ @Override
+ public DataType visit(DoubleType doubleType) {
+ return new AtomicDataType(doubleType);
+ }
+
+ @Override
+ public DataType visit(DateType dateType) {
+ return new AtomicDataType(dateType);
+ }
+
+ @Override
+ public DataType visit(TimeType timeType) {
+ return new AtomicDataType(timeType);
+ }
+
+ @Override
+ public DataType visit(TimestampType timestampType) {
+ return new AtomicDataType(timestampType);
+ }
+
+ @Override
+ public DataType visit(ZonedTimestampType zonedTimestampType) {
+ return new AtomicDataType(zonedTimestampType);
+ }
+
+ @Override
+ public DataType visit(LocalZonedTimestampType localZonedTimestampType) {
+ return new AtomicDataType(localZonedTimestampType);
+ }
+
+ @Override
+ public DataType visit(YearMonthIntervalType yearMonthIntervalType) {
+ return new AtomicDataType(yearMonthIntervalType);
+ }
+
+ @Override
+ public DataType visit(DayTimeIntervalType dayTimeIntervalType) {
+ return new AtomicDataType(dayTimeIntervalType);
+ }
+
+ @Override
+ public DataType visit(ArrayType arrayType) {
+ return new CollectionDataType(
+ arrayType,
+ arrayType.getElementType().accept(this));
+ }
+
+ @Override
+ public DataType visit(MultisetType multisetType) {
+ return new CollectionDataType(
+ multisetType,
+ multisetType.getElementType().accept(this));
+ }
+
+ @Override
+ public DataType visit(MapType mapType) {
+ return new KeyValueDataType(
+ mapType,
+ mapType.getKeyType().accept(this),
+ mapType.getValueType().accept(this));
+ }
+
+ @Override
+ public DataType visit(RowType rowType) {
+ final Map<String, DataType> fieldDataTypes = rowType.getFields()
+ .stream()
+ .collect(Collectors.toMap(RowType.RowField::getName, f -> f.getType().accept(this)));
+ return new FieldsDataType(
+ rowType,
+ fieldDataTypes);
+ }
+
+ @Override
+ public DataType visit(DistinctType distinctType) {
+ return distinctType.getSourceType().accept(this);
+ }
+
+ @Override
+ public DataType visit(StructuredType structuredType) {
+ final Map<String, DataType> attributeDataTypes = structuredType.getAttributes()
+ .stream()
+ .collect(Collectors.toMap(StructuredAttribute::getName, a -> a.getType().accept(this)));
+ return new FieldsDataType(
+ structuredType,
+ attributeDataTypes);
+ }
+
+ @Override
+ public DataType visit(NullType nullType) {
+ return new AtomicDataType(nullType);
+ }
+
+ @Override
+ public DataType visit(AnyType anyType) {
+ return new AtomicDataType(anyType);
+ }
+
+ @Override
+ public DataType visit(LogicalType other) {
+ return new AtomicDataType(other);
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private LogicalTypeDataTypeConverter() {
+ // do not instantiate
+ }
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java
index a1f4289..badfc9a 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java
@@ -49,6 +49,7 @@ import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.logical.YearMonthIntervalType;
import org.apache.flink.table.types.logical.YearMonthIntervalType.YearMonthResolution;
import org.apache.flink.table.types.logical.ZonedTimestampType;
+import org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter;
import org.apache.flink.types.Row;
import org.junit.Test;
@@ -96,10 +97,13 @@ import static org.apache.flink.table.types.TypeTestingUtils.hasConversionClass;
import static org.apache.flink.table.types.TypeTestingUtils.hasLogicalType;
import static org.apache.flink.table.types.logical.DayTimeIntervalType.DEFAULT_DAY_PRECISION;
import static org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution.MINUTE_TO_SECOND;
+import static org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter.toDataType;
+import static org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter.toLogicalType;
+import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
/**
- * Tests for {@link DataTypes}.
+ * Tests for {@link DataTypes} and {@link LogicalTypeDataTypeConverter}.
*/
@RunWith(Parameterized.class)
public class DataTypesTest {
@@ -208,4 +212,14 @@ public class DataTypesTest {
public void testConversionClass() {
assertThat(dataType, hasConversionClass(expectedConversionClass));
}
+
+ @Test
+ public void testLogicalTypeToDataTypeConversion() {
+ assertThat(toDataType(expectedLogicalType), equalTo(dataType));
+ }
+
+ @Test
+ public void testDataTypeToLogicalTypeConversion() {
+ assertThat(toLogicalType(dataType), equalTo(expectedLogicalType));
+ }
}