You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/10/18 16:08:18 UTC

[flink] branch release-1.8 updated: [FLINK-12848][table] Fix invalid row type caching

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.8 by this push:
     new 31622bb  [FLINK-12848][table] Fix invalid row type caching
31622bb is described below

commit 31622bbade03cfe1189eb621207254d3b8f3bcab
Author: Timo Walther <tw...@apache.org>
AuthorDate: Thu Oct 17 17:16:57 2019 +0200

    [FLINK-12848][table] Fix invalid row type caching
    
    This closes #9930.
---
 .../flink/api/java/typeutils/RowTypeInfo.java      |  6 ++-
 .../flink/table/calcite/FlinkTypeFactory.scala     | 49 ++++++++++------------
 2 files changed, 26 insertions(+), 29 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
index 75c28ef..aec070c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
@@ -41,7 +41,11 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
- * TypeInformation for {@link Row}
+ * {@link TypeInformation} for {@link Row}.
+ *
+ * Note: The implementations of {@link #hashCode()} and {@link #equals(Object)} do not check field
+ * names because those don't matter during serialization and runtime. This might change in future
+ * versions. See FLINK-14438 for more information.
  */
 @PublicEvolving
 public class RowTypeInfo extends TupleTypeInfoBase<Row> {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index c48967b..6286178 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -53,44 +53,37 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
   // NOTE: for future data types it might be necessary to
   // override more methods of RelDataTypeFactoryImpl
 
-  private val seenTypes = mutable.HashMap[(TypeInformation[_], Boolean), RelDataType]()
-
   def createTypeFromTypeInfo(
       typeInfo: TypeInformation[_],
       isNullable: Boolean)
     : RelDataType = {
 
-      // we cannot use seenTypes for simple types,
-      // because time indicators and timestamps would be the same
-
-      val relType = if (isSimple(typeInfo)) {
-        // simple types can be converted to SQL types and vice versa
-        val sqlType = typeInfoToSqlTypeName(typeInfo)
-        sqlType match {
+    val relType = if (isSimple(typeInfo)) {
+      // simple types can be converted to SQL types and vice versa
+      val sqlType = typeInfoToSqlTypeName(typeInfo)
+      sqlType match {
 
-          case INTERVAL_YEAR_MONTH =>
-            createSqlIntervalType(
-              new SqlIntervalQualifier(TimeUnit.YEAR, TimeUnit.MONTH, SqlParserPos.ZERO))
+        case INTERVAL_YEAR_MONTH =>
+          createSqlIntervalType(
+            new SqlIntervalQualifier(TimeUnit.YEAR, TimeUnit.MONTH, SqlParserPos.ZERO))
 
-          case INTERVAL_DAY_SECOND =>
-            createSqlIntervalType(
-              new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO))
+        case INTERVAL_DAY_SECOND =>
+          createSqlIntervalType(
+            new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO))
 
-          case TIMESTAMP if typeInfo.isInstanceOf[TimeIndicatorTypeInfo] =>
-            if (typeInfo.asInstanceOf[TimeIndicatorTypeInfo].isEventTime) {
-              createRowtimeIndicatorType()
-            } else {
-              createProctimeIndicatorType()
-            }
+        case TIMESTAMP if typeInfo.isInstanceOf[TimeIndicatorTypeInfo] =>
+          if (typeInfo.asInstanceOf[TimeIndicatorTypeInfo].isEventTime) {
+            createRowtimeIndicatorType()
+          } else {
+            createProctimeIndicatorType()
+          }
 
-          case _ =>
-            createSqlType(sqlType)
-        }
-      } else {
-        // advanced types require specific RelDataType
-        // for storing the original TypeInformation
-        seenTypes.getOrElseUpdate((typeInfo, isNullable), createAdvancedType(typeInfo, isNullable))
+        case _ =>
+          createSqlType(sqlType)
       }
+    } else {
+      createAdvancedType(typeInfo, isNullable)
+    }
 
     createTypeWithNullability(relType, isNullable)
   }