You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/05/16 09:09:32 UTC

[flink] branch master updated: [FLINK-12254][table-common] Add a logical type to data type converter

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 72bf740  [FLINK-12254][table-common] Add a logical type to data type converter
72bf740 is described below

commit 72bf7408d0e26518eaec22cc68e2be7baad2252a
Author: Timo Walther <tw...@apache.org>
AuthorDate: Wed May 15 13:14:45 2019 +0200

    [FLINK-12254][table-common] Add a logical type to data type converter
    
    This closes #8453.
---
 .../types/utils/LogicalTypeDataTypeConverter.java  | 248 +++++++++++++++++++++
 .../apache/flink/table/types/DataTypesTest.java    |  16 +-
 2 files changed, 263 insertions(+), 1 deletion(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/LogicalTypeDataTypeConverter.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/LogicalTypeDataTypeConverter.java
new file mode 100644
index 0000000..eb1c813
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/LogicalTypeDataTypeConverter.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.utils;
+
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.FieldsDataType;
+import org.apache.flink.table.types.KeyValueDataType;
+import org.apache.flink.table.types.logical.AnyType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeVisitor;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.StructuredType.StructuredAttribute;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * A converter between {@link LogicalType} and {@link DataType}.
+ */
+public final class LogicalTypeDataTypeConverter {
+
+	private static final DefaultDataTypeCreator dataTypeCreator = new DefaultDataTypeCreator();
+
+	/**
+	 * Returns the data type of a logical type without explicit conversions.
+	 */
+	public static DataType toDataType(LogicalType logicalType) {
+		return logicalType.accept(dataTypeCreator);
+	}
+
+	/**
+	 * Returns the logical type of a data type.
+	 */
+	public static LogicalType toLogicalType(DataType dataType) {
+		return dataType.getLogicalType();
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private static class DefaultDataTypeCreator implements LogicalTypeVisitor<DataType> {
+
+		@Override
+		public DataType visit(CharType charType) {
+			return new AtomicDataType(charType);
+		}
+
+		@Override
+		public DataType visit(VarCharType varCharType) {
+			return new AtomicDataType(varCharType);
+		}
+
+		@Override
+		public DataType visit(BooleanType booleanType) {
+			return new AtomicDataType(booleanType);
+		}
+
+		@Override
+		public DataType visit(BinaryType binaryType) {
+			return new AtomicDataType(binaryType);
+		}
+
+		@Override
+		public DataType visit(VarBinaryType varBinaryType) {
+			return new AtomicDataType(varBinaryType);
+		}
+
+		@Override
+		public DataType visit(DecimalType decimalType) {
+			return new AtomicDataType(decimalType);
+		}
+
+		@Override
+		public DataType visit(TinyIntType tinyIntType) {
+			return new AtomicDataType(tinyIntType);
+		}
+
+		@Override
+		public DataType visit(SmallIntType smallIntType) {
+			return new AtomicDataType(smallIntType);
+		}
+
+		@Override
+		public DataType visit(IntType intType) {
+			return new AtomicDataType(intType);
+		}
+
+		@Override
+		public DataType visit(BigIntType bigIntType) {
+			return new AtomicDataType(bigIntType);
+		}
+
+		@Override
+		public DataType visit(FloatType floatType) {
+			return new AtomicDataType(floatType);
+		}
+
+		@Override
+		public DataType visit(DoubleType doubleType) {
+			return new AtomicDataType(doubleType);
+		}
+
+		@Override
+		public DataType visit(DateType dateType) {
+			return new AtomicDataType(dateType);
+		}
+
+		@Override
+		public DataType visit(TimeType timeType) {
+			return new AtomicDataType(timeType);
+		}
+
+		@Override
+		public DataType visit(TimestampType timestampType) {
+			return new AtomicDataType(timestampType);
+		}
+
+		@Override
+		public DataType visit(ZonedTimestampType zonedTimestampType) {
+			return new AtomicDataType(zonedTimestampType);
+		}
+
+		@Override
+		public DataType visit(LocalZonedTimestampType localZonedTimestampType) {
+			return new AtomicDataType(localZonedTimestampType);
+		}
+
+		@Override
+		public DataType visit(YearMonthIntervalType yearMonthIntervalType) {
+			return new AtomicDataType(yearMonthIntervalType);
+		}
+
+		@Override
+		public DataType visit(DayTimeIntervalType dayTimeIntervalType) {
+			return new AtomicDataType(dayTimeIntervalType);
+		}
+
+		@Override
+		public DataType visit(ArrayType arrayType) {
+			return new CollectionDataType(
+				arrayType,
+				arrayType.getElementType().accept(this));
+		}
+
+		@Override
+		public DataType visit(MultisetType multisetType) {
+			return new CollectionDataType(
+				multisetType,
+				multisetType.getElementType().accept(this));
+		}
+
+		@Override
+		public DataType visit(MapType mapType) {
+			return new KeyValueDataType(
+				mapType,
+				mapType.getKeyType().accept(this),
+				mapType.getValueType().accept(this));
+		}
+
+		@Override
+		public DataType visit(RowType rowType) {
+			final Map<String, DataType> fieldDataTypes = rowType.getFields()
+				.stream()
+				.collect(Collectors.toMap(RowType.RowField::getName, f -> f.getType().accept(this)));
+			return new FieldsDataType(
+				rowType,
+				fieldDataTypes);
+		}
+
+		@Override
+		public DataType visit(DistinctType distinctType) {
+			return distinctType.getSourceType().accept(this);
+		}
+
+		@Override
+		public DataType visit(StructuredType structuredType) {
+			final Map<String, DataType> attributeDataTypes = structuredType.getAttributes()
+				.stream()
+				.collect(Collectors.toMap(StructuredAttribute::getName, a -> a.getType().accept(this)));
+			return new FieldsDataType(
+				structuredType,
+				attributeDataTypes);
+		}
+
+		@Override
+		public DataType visit(NullType nullType) {
+			return new AtomicDataType(nullType);
+		}
+
+		@Override
+		public DataType visit(AnyType anyType) {
+			return new AtomicDataType(anyType);
+		}
+
+		@Override
+		public DataType visit(LogicalType other) {
+			return new AtomicDataType(other);
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private LogicalTypeDataTypeConverter() {
+		// do not instantiate
+	}
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java
index a1f4289..badfc9a 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java
@@ -49,6 +49,7 @@ import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.flink.table.types.logical.YearMonthIntervalType;
 import org.apache.flink.table.types.logical.YearMonthIntervalType.YearMonthResolution;
 import org.apache.flink.table.types.logical.ZonedTimestampType;
+import org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter;
 import org.apache.flink.types.Row;
 
 import org.junit.Test;
@@ -96,10 +97,13 @@ import static org.apache.flink.table.types.TypeTestingUtils.hasConversionClass;
 import static org.apache.flink.table.types.TypeTestingUtils.hasLogicalType;
 import static org.apache.flink.table.types.logical.DayTimeIntervalType.DEFAULT_DAY_PRECISION;
 import static org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution.MINUTE_TO_SECOND;
+import static org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter.toDataType;
+import static org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter.toLogicalType;
+import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertThat;
 
 /**
- * Tests for {@link DataTypes}.
+ * Tests for {@link DataTypes} and {@link LogicalTypeDataTypeConverter}.
  */
 @RunWith(Parameterized.class)
 public class DataTypesTest {
@@ -208,4 +212,14 @@ public class DataTypesTest {
 	public void testConversionClass() {
 		assertThat(dataType, hasConversionClass(expectedConversionClass));
 	}
+
+	@Test
+	public void testLogicalTypeToDataTypeConversion() {
+		assertThat(toDataType(expectedLogicalType), equalTo(dataType));
+	}
+
+	@Test
+	public void testDataTypeToLogicalTypeConversion() {
+		assertThat(toLogicalType(dataType), equalTo(expectedLogicalType));
+	}
 }