You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/10/18 16:08:18 UTC
[flink] branch release-1.8 updated: [FLINK-12848][table] Fix
invalid row type caching
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.8 by this push:
new 31622bb [FLINK-12848][table] Fix invalid row type caching
31622bb is described below
commit 31622bbade03cfe1189eb621207254d3b8f3bcab
Author: Timo Walther <tw...@apache.org>
AuthorDate: Thu Oct 17 17:16:57 2019 +0200
[FLINK-12848][table] Fix invalid row type caching
This closes #9930.
---
.../flink/api/java/typeutils/RowTypeInfo.java | 6 ++-
.../flink/table/calcite/FlinkTypeFactory.scala | 49 ++++++++++------------
2 files changed, 26 insertions(+), 29 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
index 75c28ef..aec070c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
@@ -41,7 +41,11 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
/**
- * TypeInformation for {@link Row}
+ * {@link TypeInformation} for {@link Row}.
+ *
+ * Note: The implementations of {@link #hashCode()} and {@link #equals(Object)} do not check field
+ * names because those don't matter during serialization and runtime. This might change in future
+ * versions. See FLINK-14438 for more information.
*/
@PublicEvolving
public class RowTypeInfo extends TupleTypeInfoBase<Row> {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index c48967b..6286178 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -53,44 +53,37 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
// NOTE: for future data types it might be necessary to
// override more methods of RelDataTypeFactoryImpl
- private val seenTypes = mutable.HashMap[(TypeInformation[_], Boolean), RelDataType]()
-
def createTypeFromTypeInfo(
typeInfo: TypeInformation[_],
isNullable: Boolean)
: RelDataType = {
- // we cannot use seenTypes for simple types,
- // because time indicators and timestamps would be the same
-
- val relType = if (isSimple(typeInfo)) {
- // simple types can be converted to SQL types and vice versa
- val sqlType = typeInfoToSqlTypeName(typeInfo)
- sqlType match {
+ val relType = if (isSimple(typeInfo)) {
+ // simple types can be converted to SQL types and vice versa
+ val sqlType = typeInfoToSqlTypeName(typeInfo)
+ sqlType match {
- case INTERVAL_YEAR_MONTH =>
- createSqlIntervalType(
- new SqlIntervalQualifier(TimeUnit.YEAR, TimeUnit.MONTH, SqlParserPos.ZERO))
+ case INTERVAL_YEAR_MONTH =>
+ createSqlIntervalType(
+ new SqlIntervalQualifier(TimeUnit.YEAR, TimeUnit.MONTH, SqlParserPos.ZERO))
- case INTERVAL_DAY_SECOND =>
- createSqlIntervalType(
- new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO))
+ case INTERVAL_DAY_SECOND =>
+ createSqlIntervalType(
+ new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO))
- case TIMESTAMP if typeInfo.isInstanceOf[TimeIndicatorTypeInfo] =>
- if (typeInfo.asInstanceOf[TimeIndicatorTypeInfo].isEventTime) {
- createRowtimeIndicatorType()
- } else {
- createProctimeIndicatorType()
- }
+ case TIMESTAMP if typeInfo.isInstanceOf[TimeIndicatorTypeInfo] =>
+ if (typeInfo.asInstanceOf[TimeIndicatorTypeInfo].isEventTime) {
+ createRowtimeIndicatorType()
+ } else {
+ createProctimeIndicatorType()
+ }
- case _ =>
- createSqlType(sqlType)
- }
- } else {
- // advanced types require specific RelDataType
- // for storing the original TypeInformation
- seenTypes.getOrElseUpdate((typeInfo, isNullable), createAdvancedType(typeInfo, isNullable))
+ case _ =>
+ createSqlType(sqlType)
}
+ } else {
+ createAdvancedType(typeInfo, isNullable)
+ }
createTypeWithNullability(relType, isNullable)
}