You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/07/24 12:07:04 UTC

[2/3] flink git commit: [FLINK-7137] [table] Fix nullability for nested types

[FLINK-7137] [table] Fix nullability for nested types


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2d1e08a0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2d1e08a0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2d1e08a0

Branch: refs/heads/master
Commit: 2d1e08a02d84f8d7cb2734e09741eae72bf63b7d
Parents: bb11810
Author: Rong Rong <ro...@uber.com>
Authored: Wed Jul 12 21:29:16 2017 -0700
Committer: twalthr <tw...@apache.org>
Committed: Mon Jul 24 13:59:15 2017 +0200

----------------------------------------------------------------------
 .../apache/flink/table/calcite/FlinkTypeFactory.scala   | 12 +++++++++---
 .../flink/table/plan/schema/CompositeRelDataType.scala  |  2 +-
 .../flink/table/expressions/ScalarOperatorsTest.scala   | 11 +++++++++++
 .../expressions/utils/ScalarOperatorsTestBase.scala     |  6 ++++--
 4 files changed, 25 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2d1e08a0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index eba1623..b63a3ad 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -51,7 +51,10 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
 
   private val seenTypes = mutable.HashMap[TypeInformation[_], RelDataType]()
 
-  def createTypeFromTypeInfo(typeInfo: TypeInformation[_]): RelDataType = {
+  def createTypeFromTypeInfo(typeInfo: TypeInformation[_]): RelDataType =
+    createTypeFromTypeInfo(typeInfo, nullable = false)
+
+  def createTypeFromTypeInfo(typeInfo: TypeInformation[_], nullable: Boolean): RelDataType = {
     // simple type can be converted to SQL types and vice versa
     if (isSimple(typeInfo)) {
       val sqlType = typeInfoToSqlTypeName(typeInfo)
@@ -73,7 +76,7 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
           }
 
         case _ =>
-          createSqlType(sqlType)
+          createTypeWithNullability(createSqlType(sqlType), nullable)
       }
     }
     // advanced types require specific RelDataType
@@ -191,7 +194,10 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
         createTypeFromTypeInfo(mp.getValueTypeInfo), true)
 
     case ti: TypeInformation[_] =>
-      new GenericRelDataType(typeInfo, getTypeSystem.asInstanceOf[FlinkTypeSystem])
+      createTypeWithNullability(
+        new GenericRelDataType(ti, getTypeSystem.asInstanceOf[FlinkTypeSystem]),
+        nullable = true
+      )
 
     case ti@_ =>
       throw TableException(s"Unsupported type information: $ti")

http://git-wip-us.apache.org/repos/asf/flink/blob/2d1e08a0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
index a60514b..3694cc5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
@@ -73,7 +73,7 @@ object CompositeRelDataType {
         new RelDataTypeFieldImpl(
           name,
           index,
-          typeFactory.createTypeFromTypeInfo(compositeType.getTypeAt(index)))
+          typeFactory.createTypeFromTypeInfo(compositeType.getTypeAt(index), nullable = true))
             .asInstanceOf[RelDataTypeField]
       }
       .toList

http://git-wip-us.apache.org/repos/asf/flink/blob/2d1e08a0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
index 738413e..6cb9fa8 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
@@ -138,6 +138,17 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase {
 
   @Test
   def testOtherExpressions(): Unit = {
+
+    // nested field null type
+    testSqlApi("CASE WHEN f13.f1 IS NULL THEN 'a' ELSE 'b' END", "a")
+    testSqlApi("CASE WHEN f13.f1 IS NOT NULL THEN 'a' ELSE 'b' END", "b")
+    testAllApis('f13.isNull, "f13.isNull", "f13 IS NULL", "false")
+    testAllApis('f13.isNotNull, "f13.isNotNull", "f13 IS NOT NULL", "true")
+    testAllApis('f13.get("f0").isNull, "f13.get('f0').isNull", "f13.f0 IS NULL", "false")
+    testAllApis('f13.get("f0").isNotNull, "f13.get('f0').isNotNull", "f13.f0 IS NOT NULL", "true")
+    testAllApis('f13.get("f1").isNull, "f13.get('f1').isNull", "f13.f1 IS NULL", "true")
+    testAllApis('f13.get("f1").isNotNull, "f13.get('f1').isNotNull", "f13.f1 IS NOT NULL", "false")
+
     // boolean literals
     testAllApis(
       true,

http://git-wip-us.apache.org/repos/asf/flink/blob/2d1e08a0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarOperatorsTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarOperatorsTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarOperatorsTestBase.scala
index 2d22843..b719390 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarOperatorsTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarOperatorsTestBase.scala
@@ -27,7 +27,7 @@ import org.apache.flink.types.Row
 class ScalarOperatorsTestBase extends ExpressionTestBase {
 
   def testData: Row = {
-    val testData = new Row(13)
+    val testData = new Row(14)
     testData.setField(0, 1: Byte)
     testData.setField(1, 1: Short)
     testData.setField(2, 1)
@@ -41,6 +41,7 @@ class ScalarOperatorsTestBase extends ExpressionTestBase {
     testData.setField(10, "String")
     testData.setField(11, false)
     testData.setField(12, null)
+    testData.setField(13, Row.of("foo", null))
     testData
   }
 
@@ -58,7 +59,8 @@ class ScalarOperatorsTestBase extends ExpressionTestBase {
       Types.INT,
       Types.STRING,
       Types.BOOLEAN,
-      Types.BOOLEAN
+      Types.BOOLEAN,
+      Types.ROW(Types.STRING, Types.STRING)
       ).asInstanceOf[TypeInformation[Any]]
   }