You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/07/24 12:07:04 UTC
[2/3] flink git commit: [FLINK-7137] [table] Fix nullability for
nested types
[FLINK-7137] [table] Fix nullability for nested types
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2d1e08a0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2d1e08a0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2d1e08a0
Branch: refs/heads/master
Commit: 2d1e08a02d84f8d7cb2734e09741eae72bf63b7d
Parents: bb11810
Author: Rong Rong <ro...@uber.com>
Authored: Wed Jul 12 21:29:16 2017 -0700
Committer: twalthr <tw...@apache.org>
Committed: Mon Jul 24 13:59:15 2017 +0200
----------------------------------------------------------------------
.../apache/flink/table/calcite/FlinkTypeFactory.scala | 12 +++++++++---
.../flink/table/plan/schema/CompositeRelDataType.scala | 2 +-
.../flink/table/expressions/ScalarOperatorsTest.scala | 11 +++++++++++
.../expressions/utils/ScalarOperatorsTestBase.scala | 6 ++++--
4 files changed, 25 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2d1e08a0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index eba1623..b63a3ad 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -51,7 +51,10 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
private val seenTypes = mutable.HashMap[TypeInformation[_], RelDataType]()
- def createTypeFromTypeInfo(typeInfo: TypeInformation[_]): RelDataType = {
+ def createTypeFromTypeInfo(typeInfo: TypeInformation[_]): RelDataType =
+ createTypeFromTypeInfo(typeInfo, nullable = false)
+
+ def createTypeFromTypeInfo(typeInfo: TypeInformation[_], nullable: Boolean): RelDataType = {
// simple type can be converted to SQL types and vice versa
if (isSimple(typeInfo)) {
val sqlType = typeInfoToSqlTypeName(typeInfo)
@@ -73,7 +76,7 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
}
case _ =>
- createSqlType(sqlType)
+ createTypeWithNullability(createSqlType(sqlType), nullable)
}
}
// advanced types require specific RelDataType
@@ -191,7 +194,10 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
createTypeFromTypeInfo(mp.getValueTypeInfo), true)
case ti: TypeInformation[_] =>
- new GenericRelDataType(typeInfo, getTypeSystem.asInstanceOf[FlinkTypeSystem])
+ createTypeWithNullability(
+ new GenericRelDataType(ti, getTypeSystem.asInstanceOf[FlinkTypeSystem]),
+ nullable = true
+ )
case ti@_ =>
throw TableException(s"Unsupported type information: $ti")
http://git-wip-us.apache.org/repos/asf/flink/blob/2d1e08a0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
index a60514b..3694cc5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
@@ -73,7 +73,7 @@ object CompositeRelDataType {
new RelDataTypeFieldImpl(
name,
index,
- typeFactory.createTypeFromTypeInfo(compositeType.getTypeAt(index)))
+ typeFactory.createTypeFromTypeInfo(compositeType.getTypeAt(index), nullable = true))
.asInstanceOf[RelDataTypeField]
}
.toList
http://git-wip-us.apache.org/repos/asf/flink/blob/2d1e08a0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
index 738413e..6cb9fa8 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
@@ -138,6 +138,17 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase {
@Test
def testOtherExpressions(): Unit = {
+
+ // nested field null type
+ testSqlApi("CASE WHEN f13.f1 IS NULL THEN 'a' ELSE 'b' END", "a")
+ testSqlApi("CASE WHEN f13.f1 IS NOT NULL THEN 'a' ELSE 'b' END", "b")
+ testAllApis('f13.isNull, "f13.isNull", "f13 IS NULL", "false")
+ testAllApis('f13.isNotNull, "f13.isNotNull", "f13 IS NOT NULL", "true")
+ testAllApis('f13.get("f0").isNull, "f13.get('f0').isNull", "f13.f0 IS NULL", "false")
+ testAllApis('f13.get("f0").isNotNull, "f13.get('f0').isNotNull", "f13.f0 IS NOT NULL", "true")
+ testAllApis('f13.get("f1").isNull, "f13.get('f1').isNull", "f13.f1 IS NULL", "true")
+ testAllApis('f13.get("f1").isNotNull, "f13.get('f1').isNotNull", "f13.f1 IS NOT NULL", "false")
+
// boolean literals
testAllApis(
true,
http://git-wip-us.apache.org/repos/asf/flink/blob/2d1e08a0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarOperatorsTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarOperatorsTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarOperatorsTestBase.scala
index 2d22843..b719390 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarOperatorsTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarOperatorsTestBase.scala
@@ -27,7 +27,7 @@ import org.apache.flink.types.Row
class ScalarOperatorsTestBase extends ExpressionTestBase {
def testData: Row = {
- val testData = new Row(13)
+ val testData = new Row(14)
testData.setField(0, 1: Byte)
testData.setField(1, 1: Short)
testData.setField(2, 1)
@@ -41,6 +41,7 @@ class ScalarOperatorsTestBase extends ExpressionTestBase {
testData.setField(10, "String")
testData.setField(11, false)
testData.setField(12, null)
+ testData.setField(13, Row.of("foo", null))
testData
}
@@ -58,7 +59,8 @@ class ScalarOperatorsTestBase extends ExpressionTestBase {
Types.INT,
Types.STRING,
Types.BOOLEAN,
- Types.BOOLEAN
+ Types.BOOLEAN,
+ Types.ROW(Types.STRING, Types.STRING)
).asInstanceOf[TypeInformation[Any]]
}