You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/05/23 08:01:25 UTC

[flink] branch master updated (7a3f081 -> 185ed0e)

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 7a3f081  [FLINK-12411][table-planner][tests] Workaround limited support of not nullable fields in window aggregation
     new 36fef44  [hotfix][table-common] Add default precision temporal data types
     new 1ec14eb  [hotfix][table-common] Add assumption about expressions and data types
     new a71c200  [hotfix][table-common] Fix equality of data types with same conversion class
     new d7d2442  [hotfix][table-common] Add logical type check utilities
     new 3b13b60  [hotfix][table-common] Fix invalid class to data type conversion
     new 185ed0e  [FLINK-12254][table-common] Add a converter between old type information behavior and data type

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/flink/table/api/DataTypes.java |  83 +++++
 .../apache/flink/table/expressions/Expression.java |   6 +-
 .../flink/table/types/CollectionDataType.java      |  26 +-
 .../org/apache/flink/table/types/DataType.java     |  20 +-
 .../types/logical/LegacyTypeInformationType.java   | 120 +++++++
 .../types/logical/utils/LogicalTypeChecks.java     |  87 +++++
 .../table/types/utils/ClassDataTypeConverter.java  |   2 +-
 .../utils/LegacyTypeInfoDataTypeConverter.java     | 352 +++++++++++++++++++++
 .../flink/table/types/utils/TypeConversions.java   |  85 +++++
 .../org/apache/flink/table/types/DataTypeTest.java |   6 +
 .../apache/flink/table/types/DataTypesTest.java    |  12 +
 .../types/LegacyTypeInfoDataTypeConverterTest.java | 147 +++++++++
 12 files changed, 926 insertions(+), 20 deletions(-)
 create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LegacyTypeInformationType.java
 create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java
 create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/LegacyTypeInfoDataTypeConverter.java
 create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/TypeConversions.java
 create mode 100644 flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LegacyTypeInfoDataTypeConverterTest.java


[flink] 05/06: [hotfix][table-common] Fix invalid class to data type conversion

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3b13b60dc127d075b378d2f2e0a02b1b738c1245
Author: Timo Walther <tw...@apache.org>
AuthorDate: Tue May 21 14:44:20 2019 +0200

    [hotfix][table-common] Fix invalid class to data type conversion
---
 .../java/org/apache/flink/table/types/utils/ClassDataTypeConverter.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/ClassDataTypeConverter.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/ClassDataTypeConverter.java
index 599c8b2..a71c682 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/ClassDataTypeConverter.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/ClassDataTypeConverter.java
@@ -56,7 +56,7 @@ public final class ClassDataTypeConverter {
 		addDefaultDataType(double.class, DataTypes.DOUBLE());
 		addDefaultDataType(java.sql.Date.class, DataTypes.DATE());
 		addDefaultDataType(java.time.LocalDate.class, DataTypes.DATE());
-		addDefaultDataType(java.sql.Time.class, DataTypes.TIME(9));
+		addDefaultDataType(java.sql.Time.class, DataTypes.TIME(3));
 		addDefaultDataType(java.time.LocalTime.class, DataTypes.TIME(9));
 		addDefaultDataType(java.sql.Timestamp.class, DataTypes.TIMESTAMP(9));
 		addDefaultDataType(java.time.LocalDateTime.class, DataTypes.TIMESTAMP(9));


[flink] 03/06: [hotfix][table-common] Fix equality of data types with same conversion class

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a71c200b470ffe7ae614870d5855d1e24673c08b
Author: Timo Walther <tw...@apache.org>
AuthorDate: Tue May 21 13:35:46 2019 +0200

    [hotfix][table-common] Fix equality of data types with same conversion class
---
 .../flink/table/types/CollectionDataType.java      | 26 +++++++++++++---------
 .../org/apache/flink/table/types/DataType.java     | 20 +++++++++++------
 .../org/apache/flink/table/types/DataTypeTest.java |  6 +++++
 3 files changed, 34 insertions(+), 18 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java
index 17b7096..b50fea9 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java
@@ -43,7 +43,7 @@ public final class CollectionDataType extends DataType {
 			LogicalType logicalType,
 			@Nullable Class<?> conversionClass,
 			DataType elementDataType) {
-		super(logicalType, conversionClass);
+		super(logicalType, ensureArrayConversionClass(logicalType, elementDataType, conversionClass));
 		this.elementDataType = Preconditions.checkNotNull(elementDataType, "Element data type must not be null.");
 	}
 
@@ -82,16 +82,6 @@ public final class CollectionDataType extends DataType {
 	}
 
 	@Override
-	public Class<?> getConversionClass() {
-		// arrays are a special case because their default conversion class depends on the
-		// conversion class of the element type
-		if (logicalType.getTypeRoot() == LogicalTypeRoot.ARRAY && conversionClass == null) {
-			return Array.newInstance(elementDataType.getConversionClass(), 0).getClass();
-		}
-		return super.getConversionClass();
-	}
-
-	@Override
 	public <R> R accept(DataTypeVisitor<R> visitor) {
 		return visitor.visit(this);
 	}
@@ -115,4 +105,18 @@ public final class CollectionDataType extends DataType {
 	public int hashCode() {
 		return Objects.hash(super.hashCode(), elementDataType);
 	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private static Class<?> ensureArrayConversionClass(
+			LogicalType logicalType,
+			DataType elementDataType,
+			@Nullable Class<?> clazz) {
+		// arrays are a special case because their default conversion class depends on the
+		// conversion class of the element type
+		if (logicalType.getTypeRoot() == LogicalTypeRoot.ARRAY && clazz == null) {
+			return Array.newInstance(elementDataType.getConversionClass(), 0).getClass();
+		}
+		return clazz;
+	}
 }
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java
index 303a052..6b783e44 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java
@@ -52,13 +52,15 @@ import java.util.Objects;
 @PublicEvolving
 public abstract class DataType implements Serializable {
 
-	protected LogicalType logicalType;
+	protected final LogicalType logicalType;
 
-	protected @Nullable Class<?> conversionClass;
+	protected final Class<?> conversionClass;
 
 	DataType(LogicalType logicalType, @Nullable Class<?> conversionClass) {
 		this.logicalType = Preconditions.checkNotNull(logicalType, "Logical type must not be null.");
-		this.conversionClass = performEarlyClassValidation(logicalType, conversionClass);
+		this.conversionClass = performEarlyClassValidation(
+			logicalType,
+			ensureConversionClass(logicalType, conversionClass));
 	}
 
 	/**
@@ -79,9 +81,6 @@ public abstract class DataType implements Serializable {
 	 * @return the expected conversion class
 	 */
 	public Class<?> getConversionClass() {
-		if (conversionClass == null) {
-			return logicalType.getDefaultConversion();
-		}
 		return conversionClass;
 	}
 
@@ -133,7 +132,7 @@ public abstract class DataType implements Serializable {
 		}
 		DataType dataType = (DataType) o;
 		return logicalType.equals(dataType.logicalType) &&
-			Objects.equals(conversionClass, dataType.conversionClass);
+			conversionClass.equals(dataType.conversionClass);
 	}
 
 	@Override
@@ -162,4 +161,11 @@ public abstract class DataType implements Serializable {
 		}
 		return candidate;
 	}
+
+	private static Class<?> ensureConversionClass(LogicalType logicalType, @Nullable Class<?> clazz) {
+		if (clazz == null) {
+			return logicalType.getDefaultConversion();
+		}
+		return clazz;
+	}
 }
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java
index 2475db8..88b5cfd 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.types;
 
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.ValidationException;
 
 import org.junit.Test;
@@ -128,4 +129,9 @@ public class DataTypeTest {
 	public void testInvalidOrderInterval() {
 		INTERVAL(MONTH(), YEAR(2));
 	}
+
+	@Test
+	public void testConversionEquality() {
+		assertEquals(DataTypes.VARCHAR(2).bridgedTo(String.class), DataTypes.VARCHAR(2));
+	}
 }


[flink] 02/06: [hotfix][table-common] Add assumption about expressions and data types

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1ec14eb44db7b6ea28f560eb69c6d016890bdb70
Author: Timo Walther <tw...@apache.org>
AuthorDate: Tue May 21 12:23:20 2019 +0200

    [hotfix][table-common] Add assumption about expressions and data types
---
 .../main/java/org/apache/flink/table/expressions/Expression.java    | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/Expression.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/Expression.java
index 6b68dce..57c73d4 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/Expression.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/Expression.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.expressions;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.types.DataType;
 
 import java.util.List;
 
@@ -26,8 +27,11 @@ import java.util.List;
  * The interface for all expressions.
  *
  * <p>Expressions represent a logical tree for producing a computation result. Every expression
- * consists of zero or more sub-expressions. Expressions might be literal values, function calls,
+ * consists of zero, one, or more sub-expressions. Expressions might be literal values, function calls,
  * or field references.
+ *
+ * <p>Expressions are part of the API. Thus, values and return types are expressed as instances of
+ * {@link DataType}.
  */
 @PublicEvolving
 public interface Expression {


[flink] 06/06: [FLINK-12254][table-common] Add a converter between old type information behavior and data type

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 185ed0e2350b132bbec20b54e6cae6fe72710eae
Author: Timo Walther <tw...@apache.org>
AuthorDate: Tue May 21 15:45:47 2019 +0200

    [FLINK-12254][table-common] Add a converter between old type information behavior and data type
---
 .../types/logical/LegacyTypeInformationType.java   | 120 +++++++
 .../utils/LegacyTypeInfoDataTypeConverter.java     | 352 +++++++++++++++++++++
 .../flink/table/types/utils/TypeConversions.java   |  85 +++++
 .../types/LegacyTypeInfoDataTypeConverterTest.java | 147 +++++++++
 4 files changed, 704 insertions(+)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LegacyTypeInformationType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LegacyTypeInformationType.java
new file mode 100644
index 0000000..a6d6a20
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LegacyTypeInformationType.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * This type is a temporary solution to fully support the old type system stack through the new
+ * stack. Many types can be mapped directly to the new type system, however, some types such as
+ * {@code DECIMAL}, POJOs, or case classes need special handling.
+ *
+ * <p>This type differs from {@link TypeInformationAnyType}. This type is allowed to travel through
+ * the stack whereas {@link TypeInformationAnyType} should be resolved eagerly to {@link AnyType} by
+ * the planner.
+ *
+ * <p>This class can be removed once we have removed all deprecated methods that take or return
+ * {@link TypeInformation}.
+ *
+ * @see LegacyTypeInfoDataTypeConverter
+ */
+@Internal
+public final class LegacyTypeInformationType<T> extends LogicalType {
+
+	private static final String FORMAT = "LEGACY(%s)";
+
+	private final TypeInformation<T> typeInfo;
+
+	public LegacyTypeInformationType(LogicalTypeRoot logicalTypeRoot, TypeInformation<T> typeInfo) {
+		super(true, logicalTypeRoot);
+		this.typeInfo = Preconditions.checkNotNull(typeInfo, "Type information must not be null.");
+	}
+
+	public TypeInformation<T> getTypeInformation() {
+		return typeInfo;
+	}
+
+	@Override
+	public LogicalType copy(boolean isNullable) {
+		return new LegacyTypeInformationType<>(getTypeRoot(), typeInfo);
+	}
+
+	@Override
+	public String asSerializableString() {
+		throw new TableException("Legacy type information has no serializable string representation.");
+	}
+
+	@Override
+	public String asSummaryString() {
+		return withNullability(FORMAT, typeInfo);
+	}
+
+	@Override
+	public boolean supportsInputConversion(Class<?> clazz) {
+		return typeInfo.getTypeClass().isAssignableFrom(clazz);
+	}
+
+	@Override
+	public boolean supportsOutputConversion(Class<?> clazz) {
+		return clazz.isAssignableFrom(typeInfo.getTypeClass());
+	}
+
+	@Override
+	public Class<?> getDefaultConversion() {
+		return typeInfo.getTypeClass();
+	}
+
+	@Override
+	public List<LogicalType> getChildren() {
+		return Collections.emptyList();
+	}
+
+	@Override
+	public <R> R accept(LogicalTypeVisitor<R> visitor) {
+		return visitor.visit(this);
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		if (!super.equals(o)) {
+			return false;
+		}
+		LegacyTypeInformationType<?> that = (LegacyTypeInformationType<?>) o;
+		return typeInfo.equals(that.typeInfo);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(super.hashCode(), typeInfo);
+	}
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/LegacyTypeInfoDataTypeConverter.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/LegacyTypeInfoDataTypeConverter.java
new file mode 100644
index 0000000..5c31be1
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/LegacyTypeInfoDataTypeConverter.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.MultisetTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.FieldsDataType;
+import org.apache.flink.table.types.KeyValueDataType;
+import org.apache.flink.table.types.logical.LegacyTypeInformationType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TypeInformationAnyType;
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isRowtimeAttribute;
+
+/**
+ * Converter between {@link TypeInformation} and {@link DataType} that reflects the behavior before
+ * Flink 1.9. The conversion is a 1:1 mapping that allows back-and-forth conversion.
+ *
+ * <p>This converter only exists to still support deprecated methods that take or return {@link TypeInformation}.
+ * Some methods will still support type information in the future, however, the future type information
+ * support will integrate nicer with the new type stack. This converter reflects the old behavior that includes:
+ *
+ * <p>Use old {@code java.sql.*} time classes for time data types.
+ *
+ * <p>Only support millisecond precision for timestamps or day-time intervals.
+ *
+ * <p>Do not support fractional seconds for the time type.
+ *
+ * <p>Let variable precision and scale for decimal types pass through the planner.
+ *
+ * <p>Let POJOs, case classes, and tuples pass through the planner.
+ *
+ * <p>Inconsistent nullability. Most types are nullable even though type information does not support it.
+ *
+ * <p>Distinction between {@link BasicArrayTypeInfo} and {@link ObjectArrayTypeInfo}.
+ */
+@Internal
+public final class LegacyTypeInfoDataTypeConverter {
+
+	private static final Map<TypeInformation<?>, DataType> typeInfoDataTypeMap = new HashMap<>();
+	private static final Map<DataType, TypeInformation<?>> dataTypeTypeInfoMap = new HashMap<>();
+	static {
+		addMapping(Types.STRING, DataTypes.STRING().bridgedTo(String.class));
+		addMapping(Types.BOOLEAN, DataTypes.BOOLEAN().bridgedTo(Boolean.class));
+		addMapping(Types.BYTE, DataTypes.TINYINT().bridgedTo(Byte.class));
+		addMapping(Types.SHORT, DataTypes.SMALLINT().bridgedTo(Short.class));
+		addMapping(Types.INT, DataTypes.INT().bridgedTo(Integer.class));
+		addMapping(Types.LONG, DataTypes.BIGINT().bridgedTo(Long.class));
+		addMapping(Types.FLOAT, DataTypes.FLOAT().bridgedTo(Float.class));
+		addMapping(Types.DOUBLE, DataTypes.DOUBLE().bridgedTo(Double.class));
+		addMapping(Types.BIG_DEC, createLegacyType(LogicalTypeRoot.DECIMAL, Types.BIG_DEC));
+		addMapping(Types.SQL_DATE, DataTypes.DATE().bridgedTo(java.sql.Date.class));
+		addMapping(Types.SQL_TIME, DataTypes.TIME(0).bridgedTo(java.sql.Time.class));
+		addMapping(Types.SQL_TIMESTAMP, DataTypes.TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class));
+		addMapping(
+			TimeIntervalTypeInfo.INTERVAL_MONTHS,
+			DataTypes.INTERVAL(DataTypes.MONTH()).bridgedTo(Integer.class));
+		addMapping(
+			TimeIntervalTypeInfo.INTERVAL_MILLIS,
+			DataTypes.INTERVAL(DataTypes.SECOND(3)).bridgedTo(Long.class));
+		addMapping(
+			PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO,
+			DataTypes.ARRAY(DataTypes.BOOLEAN().notNull().bridgedTo(boolean.class)).bridgedTo(boolean[].class));
+		addMapping(
+			PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO,
+			DataTypes.BYTES().bridgedTo(byte[].class));
+		addMapping(
+			PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO,
+			DataTypes.ARRAY(DataTypes.SMALLINT().notNull().bridgedTo(short.class)).bridgedTo(short[].class));
+		addMapping(
+			PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO,
+			DataTypes.ARRAY(DataTypes.INT().notNull().bridgedTo(int.class)).bridgedTo(int[].class));
+		addMapping(
+			PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO,
+			DataTypes.ARRAY(DataTypes.BIGINT().notNull().bridgedTo(long.class)).bridgedTo(long[].class));
+		addMapping(
+			PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO,
+			DataTypes.ARRAY(DataTypes.FLOAT().notNull().bridgedTo(float.class)).bridgedTo(float[].class));
+		addMapping(
+			PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO,
+			DataTypes.ARRAY(DataTypes.DOUBLE().notNull().bridgedTo(double.class)).bridgedTo(double[].class));
+	}
+
+	private static void addMapping(TypeInformation<?> typeInfo, DataType dataType) {
+		Preconditions.checkArgument(!typeInfoDataTypeMap.containsKey(typeInfo));
+		typeInfoDataTypeMap.put(typeInfo, dataType);
+		dataTypeTypeInfoMap.put(dataType, typeInfo);
+	}
+
+	public static DataType toDataType(TypeInformation<?> typeInfo) {
+		// time indicators first as their hashCode/equals is shared with those of regular timestamps
+		if (typeInfo instanceof TimeIndicatorTypeInfo) {
+			return convertToTimeAttributeType((TimeIndicatorTypeInfo) typeInfo);
+		}
+
+		final DataType foundDataType = typeInfoDataTypeMap.get(typeInfo);
+		if (foundDataType != null) {
+			return foundDataType;
+		}
+
+		if (typeInfo instanceof RowTypeInfo) {
+			return convertToRowType((RowTypeInfo) typeInfo);
+		}
+
+		else if (typeInfo instanceof ObjectArrayTypeInfo) {
+			return convertToArrayType(
+				typeInfo.getTypeClass(),
+				((ObjectArrayTypeInfo) typeInfo).getComponentInfo());
+		}
+
+		else if (typeInfo instanceof BasicArrayTypeInfo) {
+			return createLegacyType(LogicalTypeRoot.ARRAY, typeInfo);
+		}
+
+		else if (typeInfo instanceof MultisetTypeInfo) {
+			return convertToMultisetType(((MultisetTypeInfo) typeInfo).getElementTypeInfo());
+		}
+
+		else if (typeInfo instanceof MapTypeInfo) {
+			return convertToMapType((MapTypeInfo) typeInfo);
+		}
+
+		else if (typeInfo instanceof CompositeType) {
+			return createLegacyType(LogicalTypeRoot.STRUCTURED_TYPE, typeInfo);
+		}
+
+		return createLegacyType(LogicalTypeRoot.ANY, typeInfo);
+	}
+
+	public static TypeInformation<?> toLegacyTypeInfo(DataType dataType) {
+		// time indicators first as their hashCode/equals is shared with those of regular timestamps
+		if (canConvertToTimeAttributeTypeInfo(dataType)) {
+			return convertToTimeAttributeTypeInfo((TimestampType) dataType.getLogicalType());
+		}
+
+		final TypeInformation<?> foundTypeInfo = dataTypeTypeInfoMap.get(dataType);
+		if (foundTypeInfo != null) {
+			return foundTypeInfo;
+		}
+
+		if (canConvertToLegacyTypeInfo(dataType)) {
+			return convertToLegacyTypeInfo(dataType);
+		}
+
+		else if (canConvertToRowTypeInfo(dataType)) {
+			return convertToRowTypeInfo((FieldsDataType) dataType);
+		}
+
+		// this could also match for basic array type info but this is covered by legacy type info
+		else if (canConvertToObjectArrayTypeInfo(dataType)) {
+			return convertToObjectArrayTypeInfo((CollectionDataType) dataType);
+		}
+
+		else if (canConvertToMultisetTypeInfo(dataType)) {
+			return convertToMultisetTypeInfo((CollectionDataType) dataType);
+		}
+
+		else if (canConvertToMapTypeInfo(dataType)) {
+			return convertToMapTypeInfo((KeyValueDataType) dataType);
+		}
+
+		// makes the any type accessible in the legacy planner
+		else if (canConvertToAnyTypeInfo(dataType)) {
+			return convertToAnyTypeInfo(dataType);
+		}
+
+		throw new TableException(
+			String.format(
+				"Unsupported conversion from data type '%s' to type information. Only data types " +
+					"that originated from type information fully support a reverse conversion.",
+				dataType));
+	}
+
+	private static DataType createLegacyType(LogicalTypeRoot typeRoot, TypeInformation<?> typeInfo) {
+		return new AtomicDataType(new LegacyTypeInformationType<>(typeRoot, typeInfo))
+			.bridgedTo(typeInfo.getTypeClass());
+	}
+
+	private static DataType convertToTimeAttributeType(TimeIndicatorTypeInfo timeIndicatorTypeInfo) {
+		final TimestampKind kind;
+		if (timeIndicatorTypeInfo.isEventTime()) {
+			kind = TimestampKind.ROWTIME;
+		} else {
+			kind = TimestampKind.PROCTIME;
+		}
+		return new AtomicDataType(new TimestampType(true, kind, 3))
+			.bridgedTo(java.sql.Timestamp.class);
+	}
+
+	private static boolean canConvertToTimeAttributeTypeInfo(DataType dataType) {
+		return hasRoot(dataType.getLogicalType(), LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) &&
+			dataTypeTypeInfoMap.containsKey(dataType) && // checks precision and conversion
+			((TimestampType) dataType.getLogicalType()).getKind() != TimestampKind.REGULAR;
+	}
+
+	private static TypeInformation<?> convertToTimeAttributeTypeInfo(TimestampType timestampType) {
+		if (isRowtimeAttribute(timestampType)) {
+			return TimeIndicatorTypeInfo.ROWTIME_INDICATOR;
+		} else {
+			return TimeIndicatorTypeInfo.PROCTIME_INDICATOR;
+		}
+	}
+
+	private static DataType convertToRowType(RowTypeInfo rowTypeInfo) {
+		final String[] fieldNames = rowTypeInfo.getFieldNames();
+		final DataTypes.Field[] fields = IntStream.range(0, rowTypeInfo.getArity())
+			.mapToObj(i -> {
+				DataType fieldType = toDataType(rowTypeInfo.getTypeAt(i));
+
+				return DataTypes.FIELD(
+					fieldNames[i],
+					fieldType);
+			})
+			.toArray(DataTypes.Field[]::new);
+
+		return DataTypes.ROW(fields).bridgedTo(Row.class);
+	}
+
+	private static boolean canConvertToRowTypeInfo(DataType dataType) {
+		return hasRoot(dataType.getLogicalType(), LogicalTypeRoot.ROW) &&
+			dataType.getConversionClass().equals(Row.class) &&
+			((RowType) dataType.getLogicalType()).getFields().stream()
+				.noneMatch(f -> f.getDescription().isPresent());
+	}
+
+	private static TypeInformation<?> convertToRowTypeInfo(FieldsDataType fieldsDataType) {
+		final RowType rowType = (RowType) fieldsDataType.getLogicalType();
+
+		final String[] fieldNames = rowType.getFields()
+			.stream()
+			.map(RowType.RowField::getName)
+			.toArray(String[]::new);
+
+		final TypeInformation<?>[] fieldTypes = Stream.of(fieldNames)
+			.map(name -> fieldsDataType.getFieldDataTypes().get(name))
+			.map(LegacyTypeInfoDataTypeConverter::toLegacyTypeInfo)
+			.toArray(TypeInformation[]::new);
+
+		return Types.ROW_NAMED(fieldNames, fieldTypes);
+	}
+
+	private static DataType convertToArrayType(Class<?> arrayClass, TypeInformation<?> elementTypeInfo) {
+		return DataTypes.ARRAY(toDataType(elementTypeInfo)).bridgedTo(arrayClass);
+	}
+
+	private static boolean canConvertToObjectArrayTypeInfo(DataType dataType) {
+		return hasRoot(dataType.getLogicalType(), LogicalTypeRoot.ARRAY) &&
+			dataType.getConversionClass().isArray();
+	}
+
+	private static TypeInformation<?> convertToObjectArrayTypeInfo(CollectionDataType collectionDataType) {
+		// Types.OBJECT_ARRAY would return a basic type info for strings
+		return ObjectArrayTypeInfo.getInfoFor(
+			toLegacyTypeInfo(collectionDataType.getElementDataType()));
+	}
+
+	private static DataType convertToMultisetType(TypeInformation elementTypeInfo) {
+		return DataTypes.MULTISET(toDataType(elementTypeInfo)).bridgedTo(Map.class);
+	}
+
+	private static boolean canConvertToMultisetTypeInfo(DataType dataType) {
+		return hasRoot(dataType.getLogicalType(), LogicalTypeRoot.MULTISET) &&
+			dataType.getConversionClass() == Map.class;
+	}
+
+	private static TypeInformation<?> convertToMultisetTypeInfo(CollectionDataType collectionDataType) {
+		return new MultisetTypeInfo<>(
+			toLegacyTypeInfo(collectionDataType.getElementDataType()));
+	}
+
+	private static DataType convertToMapType(MapTypeInfo typeInfo) {
+		return DataTypes.MAP(
+				toDataType(typeInfo.getKeyTypeInfo()),
+				toDataType(typeInfo.getValueTypeInfo()))
+			.bridgedTo(Map.class);
+	}
+
+	private static boolean canConvertToMapTypeInfo(DataType dataType) {
+		return hasRoot(dataType.getLogicalType(), LogicalTypeRoot.MAP) &&
+			dataType.getConversionClass() == Map.class;
+	}
+
+	private static TypeInformation<?> convertToMapTypeInfo(KeyValueDataType dataType) {
+		return Types.MAP(
+			toLegacyTypeInfo(dataType.getKeyDataType()),
+			toLegacyTypeInfo(dataType.getValueDataType()));
+	}
+
+	private static boolean canConvertToLegacyTypeInfo(DataType dataType) {
+		return dataType.getLogicalType() instanceof LegacyTypeInformationType;
+	}
+
+	private static TypeInformation<?> convertToLegacyTypeInfo(DataType dataType) {
+		return ((LegacyTypeInformationType) dataType.getLogicalType()).getTypeInformation();
+	}
+
+	private static boolean canConvertToAnyTypeInfo(DataType dataType) {
+		return dataType.getLogicalType() instanceof TypeInformationAnyType &&
+			dataType.getConversionClass().equals(
+				((TypeInformationAnyType) dataType.getLogicalType()).getTypeInformation().getTypeClass());
+	}
+
+	private static TypeInformation<?> convertToAnyTypeInfo(DataType dataType) {
+		return ((TypeInformationAnyType) dataType.getLogicalType()).getTypeInformation();
+	}
+
+	private LegacyTypeInfoDataTypeConverter() {
+		// no instantiation
+	}
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/TypeConversions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/TypeConversions.java
new file mode 100644
index 0000000..a5c978f
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/TypeConversions.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.util.Optional;
+import java.util.stream.Stream;
+
+/**
+ * Conversion hub for interoperability of {@link Class}, {@link TypeInformation}, {@link DataType},
+ * and {@link LogicalType}.
+ *
+ * <p>See the corresponding converter classes for more information about how the conversion is performed.
+ */
+@Internal
+public final class TypeConversions {
+
+	public static DataType fromLegacyInfoToDataType(TypeInformation<?> typeInfo) {
+		return LegacyTypeInfoDataTypeConverter.toDataType(typeInfo);
+	}
+
+	public static DataType[] fromLegacyInfoToDataType(TypeInformation<?>[] typeInfo) {
+		return Stream.of(typeInfo)
+			.map(TypeConversions::fromLegacyInfoToDataType)
+			.toArray(DataType[]::new);
+	}
+
+	public static TypeInformation<?> fromDataTypeToLegacyInfo(DataType dataType) {
+		return LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(dataType);
+	}
+
+	public static TypeInformation<?>[] fromDataTypeToLegacyInfo(DataType[] dataType) {
+		return Stream.of(dataType)
+			.map(TypeConversions::fromDataTypeToLegacyInfo)
+			.toArray(TypeInformation[]::new);
+	}
+
+	public static Optional<DataType> fromClassToDataType(Class<?> clazz) {
+		return ClassDataTypeConverter.extractDataType(clazz);
+	}
+
+	public static DataType fromLogicalToDataType(LogicalType logicalType) {
+		return LogicalTypeDataTypeConverter.toDataType(logicalType);
+	}
+
+	public static DataType[] fromLogicalToDataType(LogicalType[] logicalTypes) {
+		return Stream.of(logicalTypes)
+			.map(LogicalTypeDataTypeConverter::toDataType)
+			.toArray(DataType[]::new);
+	}
+
+	public static LogicalType fromDataToLogicalType(DataType dataType) {
+		return dataType.getLogicalType();
+	}
+
+	public static LogicalType[] fromDataToLogicalType(DataType[] dataTypes) {
+		return Stream.of(dataTypes)
+			.map(TypeConversions::fromDataToLogicalType)
+			.toArray(LogicalType[]::new);
+	}
+
+	private TypeConversions() {
+		// no instance
+	}
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LegacyTypeInfoDataTypeConverterTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LegacyTypeInfoDataTypeConverterTest.java
new file mode 100644
index 0000000..fbce34e
--- /dev/null
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LegacyTypeInfoDataTypeConverterTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.logical.LegacyTypeInformationType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter;
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link LegacyTypeInfoDataTypeConverter}.
+ */
+@RunWith(Parameterized.class)
+public class LegacyTypeInfoDataTypeConverterTest {
+
+	@Parameters(name = "[{index}] type info: {0} data type: {1}")
+	public static List<Object[]> typeInfo() {
+		return Arrays.asList(
+			new Object[][]{
+				{Types.STRING, DataTypes.STRING()},
+
+				{Types.BOOLEAN, DataTypes.BOOLEAN()},
+
+				{Types.SQL_TIMESTAMP, DataTypes.TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class)},
+
+				{
+					Types.GENERIC(LegacyTypeInfoDataTypeConverterTest.class),
+					new AtomicDataType(
+						new LegacyTypeInformationType<>(
+							LogicalTypeRoot.ANY,
+							Types.GENERIC(LegacyTypeInfoDataTypeConverterTest.class)))
+				},
+
+				{
+					Types.ROW_NAMED(new String[] {"field1", "field2"}, Types.INT, Types.LONG),
+					DataTypes.ROW(
+							FIELD("field1", DataTypes.INT()),
+							FIELD("field2", DataTypes.BIGINT()))
+				},
+
+				{
+					Types.MAP(Types.FLOAT, Types.ROW(Types.BYTE)),
+					DataTypes.MAP(DataTypes.FLOAT(), DataTypes.ROW(FIELD("f0", DataTypes.TINYINT())))
+				},
+
+				{
+					Types.PRIMITIVE_ARRAY(Types.FLOAT),
+					DataTypes.ARRAY(DataTypes.FLOAT().notNull().bridgedTo(float.class))
+						.bridgedTo(float[].class)
+				},
+
+				{
+					Types.PRIMITIVE_ARRAY(Types.BYTE),
+					DataTypes.BYTES()
+				},
+
+				{
+					Types.OBJECT_ARRAY(Types.PRIMITIVE_ARRAY(Types.FLOAT)),
+					DataTypes.ARRAY(
+						DataTypes.ARRAY(DataTypes.FLOAT().notNull().bridgedTo(float.class))
+							.bridgedTo(float[].class))
+						.bridgedTo(float[][].class)
+				},
+
+				{
+					BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO,
+					new AtomicDataType(
+						new LegacyTypeInformationType<>(
+							LogicalTypeRoot.ARRAY,
+							BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO))
+				},
+
+				{
+					ObjectArrayTypeInfo.getInfoFor(Types.STRING),
+					DataTypes.ARRAY(DataTypes.STRING())
+						.bridgedTo(String[].class)
+				},
+
+				{
+					Types.TUPLE(Types.SHORT, Types.DOUBLE, Types.FLOAT),
+					new AtomicDataType(
+						new LegacyTypeInformationType<>(
+							LogicalTypeRoot.STRUCTURED_TYPE,
+							Types.TUPLE(Types.SHORT, Types.DOUBLE, Types.FLOAT)))
+				},
+
+				{
+					TimeIndicatorTypeInfo.ROWTIME_INDICATOR,
+					new AtomicDataType(new TimestampType(true, TimestampKind.ROWTIME, 3))
+						.bridgedTo(java.sql.Timestamp.class)
+				}
+			}
+		);
+	}
+
+	@Parameter
+	public TypeInformation<?> inputTypeInfo;
+
+	@Parameter(1)
+	public DataType dataType;
+
+	@Test
+	public void testTypeInfoToDataTypeConversion() {
+		assertThat(LegacyTypeInfoDataTypeConverter.toDataType(inputTypeInfo), equalTo(dataType));
+	}
+
+	@Test
+	public void testDataTypeToTypeInfoConversion() {
+		assertThat(LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(dataType), equalTo(inputTypeInfo));
+	}
+}


[flink] 01/06: [hotfix][table-common] Add default precision temporal data types

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 36fef4457a7f1de47989c8a2485581bcf8633b32
Author: Timo Walther <tw...@apache.org>
AuthorDate: Tue May 21 12:19:04 2019 +0200

    [hotfix][table-common] Add default precision temporal data types
---
 .../java/org/apache/flink/table/api/DataTypes.java | 83 ++++++++++++++++++++++
 .../apache/flink/table/types/DataTypesTest.java    | 12 ++++
 2 files changed, 95 insertions(+)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java
index 2591227..dc7c319 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java
@@ -240,6 +240,7 @@ public final class DataTypes {
 	 * <p>Compared to the SQL standard, leap seconds (23:59:60 and 23:59:61) are not supported as the
 	 * semantics are closer to {@link java.time.LocalTime}. A time WITH time zone is not provided.
 	 *
+	 * @see #TIME()
 	 * @see TimeType
 	 */
 	public static DataType TIME(int precision) {
@@ -247,6 +248,22 @@ public final class DataTypes {
 	}
 
 	/**
+	 * Data type of a time WITHOUT time zone {@code TIME} with no fractional seconds by default.
+	 *
+	 * <p>An instance consists of {@code hour:minute:second} with up to second precision
+	 * and values ranging from {@code 00:00:00} to {@code 23:59:59}.
+	 *
+	 * <p>Compared to the SQL standard, leap seconds (23:59:60 and 23:59:61) are not supported as the
+	 * semantics are closer to {@link java.time.LocalTime}. A time WITH time zone is not provided.
+	 *
+	 * @see #TIME(int)
+	 * @see TimeType
+	 */
+	public static DataType TIME() {
+		return new AtomicDataType(new TimeType());
+	}
+
+	/**
 	 * Data type of a timestamp WITHOUT time zone {@code TIMESTAMP(p)} where {@code p} is the number
 	 * of digits of fractional seconds (=precision). {@code p} must have a value between 0 and 9 (both
 	 * inclusive).
@@ -267,6 +284,26 @@ public final class DataTypes {
 	}
 
 	/**
+	 * Data type of a timestamp WITHOUT time zone {@code TIMESTAMP} with 6 digits of fractional seconds
+	 * by default.
+	 *
+	 * <p>An instance consists of {@code year-month-day hour:minute:second[.fractional]} with up to
+	 * microsecond precision and values ranging from {@code 0000-01-01 00:00:00.000000} to
+	 * {@code 9999-12-31 23:59:59.999999}.
+	 *
+	 * <p>Compared to the SQL standard, leap seconds (23:59:60 and 23:59:61) are not supported as the
+	 * semantics are closer to {@link java.time.LocalDateTime}.
+	 *
+	 * @see #TIMESTAMP(int)
+	 * @see #TIMESTAMP_WITH_TIME_ZONE(int)
+	 * @see #TIMESTAMP_WITH_LOCAL_TIME_ZONE(int)
+	 * @see TimestampType
+	 */
+	public static DataType TIMESTAMP() {
+		return new AtomicDataType(new TimestampType());
+	}
+
+	/**
 	 * Data type of a timestamp WITH time zone {@code TIMESTAMP(p) WITH TIME ZONE} where {@code p} is
 	 * the number of digits of fractional seconds (=precision). {@code p} must have a value between 0
 	 * and 9 (both inclusive).
@@ -287,6 +324,26 @@ public final class DataTypes {
 	}
 
 	/**
+	 * Data type of a timestamp WITH time zone {@code TIMESTAMP WITH TIME ZONE} with 6 digits of fractional
+	 * seconds by default.
+	 *
+	 * <p>An instance consists of {@code year-month-day hour:minute:second[.fractional] zone} with up
+	 * to microsecond precision and values ranging from {@code 0000-01-01 00:00:00.000000 +14:59} to
+	 * {@code 9999-12-31 23:59:59.999999 -14:59}.
+	 *
+	 * <p>Compared to the SQL standard, leap seconds (23:59:60 and 23:59:61) are not supported as the
+	 * semantics are closer to {@link java.time.OffsetDateTime}.
+	 *
+	 * @see #TIMESTAMP_WITH_TIME_ZONE(int)
+	 * @see #TIMESTAMP(int)
+	 * @see #TIMESTAMP_WITH_LOCAL_TIME_ZONE(int)
+	 * @see ZonedTimestampType
+	 */
+	public static DataType TIMESTAMP_WITH_TIME_ZONE() {
+		return new AtomicDataType(new ZonedTimestampType());
+	}
+
+	/**
 	 * Data type of a timestamp WITH LOCAL time zone {@code TIMESTAMP(p) WITH LOCAL TIME ZONE} where
 	 * {@code p} is the number of digits of fractional seconds (=precision). {@code p} must have a value
 	 * between 0 and 9 (both inclusive).
@@ -313,6 +370,32 @@ public final class DataTypes {
 	}
 
 	/**
+	 * Data type of a timestamp WITH LOCAL time zone {@code TIMESTAMP WITH LOCAL TIME ZONE} with 6 digits
+	 * of fractional seconds by default.
+	 *
+	 * <p>An instance consists of {@code year-month-day hour:minute:second[.fractional] zone} with up
+	 * to microsecond precision and values ranging from {@code 0000-01-01 00:00:00.000000 +14:59} to
+	 * {@code 9999-12-31 23:59:59.999999 -14:59}. Leap seconds (23:59:60 and 23:59:61) are not supported
+	 * as the semantics are closer to {@link java.time.OffsetDateTime}.
+	 *
+	 * <p>Compared to {@link ZonedTimestampType}, the time zone offset information is not stored physically
+	 * in every datum. Instead, the type assumes {@link java.time.Instant} semantics in UTC time zone
+	 * at the edges of the table ecosystem. Every datum is interpreted in the local time zone configured
+	 * in the current session for computation and visualization.
+	 *
+	 * <p>This type fills the gap between time zone free and time zone mandatory timestamp types by
+	 * allowing the interpretation of UTC timestamps according to the configured session timezone.
+	 *
+	 * @see #TIMESTAMP_WITH_LOCAL_TIME_ZONE(int)
+	 * @see #TIMESTAMP(int)
+	 * @see #TIMESTAMP_WITH_TIME_ZONE(int)
+	 * @see LocalZonedTimestampType
+	 */
+	public static DataType TIMESTAMP_WITH_LOCAL_TIME_ZONE() {
+		return new AtomicDataType(new LocalZonedTimestampType());
+	}
+
+	/**
 	 * Data type of a temporal interval. There are two types of temporal intervals: day-time intervals
 	 * with up to nanosecond granularity or year-month intervals with up to month granularity.
 	 *
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java
index badfc9a..3945fb5 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java
@@ -144,16 +144,28 @@ public class DataTypesTest {
 
 				{TIME(3), new TimeType(3), java.time.LocalTime.class},
 
+				{TIME(), new TimeType(0), java.time.LocalTime.class},
+
 				{TIMESTAMP(3), new TimestampType(3), java.time.LocalDateTime.class},
 
+				{TIMESTAMP(), new TimestampType(6), java.time.LocalDateTime.class},
+
 				{TIMESTAMP_WITH_TIME_ZONE(3),
 					new ZonedTimestampType(3),
 					java.time.OffsetDateTime.class},
 
+				{TIMESTAMP_WITH_TIME_ZONE(),
+					new ZonedTimestampType(6),
+					java.time.OffsetDateTime.class},
+
 				{TIMESTAMP_WITH_LOCAL_TIME_ZONE(3),
 					new LocalZonedTimestampType(3),
 					java.time.Instant.class},
 
+				{TIMESTAMP_WITH_LOCAL_TIME_ZONE(),
+					new LocalZonedTimestampType(6),
+					java.time.Instant.class},
+
 				{INTERVAL(MINUTE(), SECOND(3)),
 					new DayTimeIntervalType(MINUTE_TO_SECOND, DEFAULT_DAY_PRECISION, 3),
 					java.time.Duration.class},


[flink] 04/06: [hotfix][table-common] Add logical type check utilities

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d7d24425e47408eb094b7f5aa6ecfcffd7f89db7
Author: Timo Walther <tw...@apache.org>
AuthorDate: Tue May 21 14:42:02 2019 +0200

    [hotfix][table-common] Add logical type check utilities
---
 .../types/logical/utils/LogicalTypeChecks.java     | 87 ++++++++++++++++++++++
 1 file changed, 87 insertions(+)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java
new file mode 100644
index 0000000..3958922
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+
+/**
+ * Utilities for checking {@link LogicalType}.
+ */
+@Internal
+public final class LogicalTypeChecks {
+
+	private static final TimeAttributeChecker TIME_ATTRIBUTE_CHECKER = new TimeAttributeChecker();
+
+	public static boolean hasRoot(LogicalType logicalType, LogicalTypeRoot typeRoot) {
+		return logicalType.getTypeRoot() == typeRoot;
+	}
+
+	public static boolean hasFamily(LogicalType logicalType, LogicalTypeFamily family) {
+		return logicalType.getTypeRoot().getFamilies().contains(family);
+	}
+
+	public static boolean isTimeAttribute(LogicalType logicalType) {
+		return logicalType.accept(TIME_ATTRIBUTE_CHECKER) != TimestampKind.REGULAR;
+	}
+
+	public static boolean isRowtimeAttribute(LogicalType logicalType) {
+		return logicalType.accept(TIME_ATTRIBUTE_CHECKER) == TimestampKind.ROWTIME;
+	}
+
+	public static boolean isProctimeAttribute(LogicalType logicalType) {
+		return logicalType.accept(TIME_ATTRIBUTE_CHECKER) == TimestampKind.PROCTIME;
+	}
+
+	private LogicalTypeChecks() {
+		// no instantiation
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private static class TimeAttributeChecker extends LogicalTypeDefaultVisitor<TimestampKind> {
+
+		@Override
+		public TimestampKind visit(TimestampType timestampType) {
+			return timestampType.getKind();
+		}
+
+		@Override
+		public TimestampKind visit(ZonedTimestampType zonedTimestampType) {
+			return zonedTimestampType.getKind();
+		}
+
+		@Override
+		public TimestampKind visit(LocalZonedTimestampType localZonedTimestampType) {
+			return localZonedTimestampType.getKind();
+		}
+
+		@Override
+		protected TimestampKind defaultMethod(LogicalType logicalType) {
+			// we don't verify that type is actually a timestamp
+			return TimestampKind.REGULAR;
+		}
+	}
+}