You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/11/08 21:38:35 UTC

[2/3] flink git commit: [FLINK-7922] [table] Fix FlinkTypeFactory.leastRestrictive for composite types.

[FLINK-7922] [table] Fix FlinkTypeFactory.leastRestrictive for composite types.

This closes #4929.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/eb299576
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/eb299576
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/eb299576

Branch: refs/heads/release-1.3
Commit: eb2995762ea441e5fdc0097263b9eaa5dc674aa1
Parents: f6ac86f
Author: Rong Rong <ro...@uber.com>
Authored: Tue Oct 31 11:05:38 2017 -0700
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 8 21:25:33 2017 +0100

----------------------------------------------------------------------
 .../flink/table/calcite/FlinkTypeFactory.scala  | 39 ++++++++------
 .../api/scala/batch/sql/SetOperatorsTest.scala  | 28 ++++++++++
 .../flink/table/api/stream/sql/UnionTest.scala  | 55 ++++++++++++++++++++
 .../table/expressions/ScalarOperatorsTest.scala | 14 +++--
 .../datastream/DataStreamCalcITCase.scala       | 20 +++++--
 .../flink/table/utils/CommonTestData.scala      |  5 ++
 6 files changed, 139 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/eb299576/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index 853c3e3..2486813 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -275,30 +275,37 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
   override def leastRestrictive(types: util.List[RelDataType]): RelDataType = {
     val type0 = types.get(0)
     if (type0.getSqlTypeName != null) {
-      val resultType = resolveAny(types)
-      if (resultType != null) {
-        return resultType
+      val resultType = resolveAllIdenticalTypes(types)
+      if (resultType.isDefined) {
+        // result type for identical types
+        return resultType.get
       }
     }
+    // fall back to super
     super.leastRestrictive(types)
   }
 
-  private def resolveAny(types: util.List[RelDataType]): RelDataType = {
+  private def resolveAllIdenticalTypes(types: util.List[RelDataType]): Option[RelDataType] = {
     val allTypes = types.asScala
-    val hasAny = allTypes.exists(_.getSqlTypeName == SqlTypeName.ANY)
-    if (hasAny) {
-      val head = allTypes.head
-      // only allow ANY with exactly the same GenericRelDataType for all types
-      if (allTypes.forall(_ == head)) {
-        val nullable = allTypes.exists(
-          sqlType => sqlType.isNullable || sqlType.getSqlTypeName == SqlTypeName.NULL
-        )
-        createTypeWithNullability(head, nullable)
-      } else {
+
+    val head = allTypes.head
+    // check if all types are the same
+    if (allTypes.forall(_ == head)) {
+      // types are the same, check nullability
+      val nullable = allTypes
+        .exists(sqlType => sqlType.isNullable || sqlType.getSqlTypeName == SqlTypeName.NULL)
+      // return type with nullability
+      Some(createTypeWithNullability(head, nullable))
+    } else {
+      // types are not all the same
+      if (allTypes.exists(_.getSqlTypeName == SqlTypeName.ANY)) {
+        // one of the type was ANY.
+        // we cannot generate a common type if it differs from other types.
         throw TableException("Generic ANY types must have a common type information.")
+      } else {
+        // cannot resolve a common type for different input types
+        None
       }
-    } else {
-      null
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/eb299576/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala
index ba0326a..2920a32 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.api.scala.batch.sql
 
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.Null
 import org.apache.flink.table.utils.TableTestBase
 import org.apache.flink.table.utils.TableTestUtil._
 import org.junit.Test
@@ -82,4 +83,31 @@ class SetOperatorsTest extends TableTestBase {
     )
   }
 
+  @Test
+  def testUnionNullableTypes(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[((Int, String), (Int, String), Int)]("A", 'a, 'b, 'c)
+
+    val in = t.select('a)
+      .unionAll(
+        t.select(('c > 0) ? ('b, Null(createTypeInformation[(Int, String)]))))
+
+    val expected = binaryNode(
+      "DataSetUnion",
+      unaryNode(
+        "DataSetCalc",
+        batchTableNode(0),
+        term("select", "a")
+      ),
+      unaryNode(
+        "DataSetCalc",
+        batchTableNode(0),
+        term("select", "CASE(>(c, 0), b, null) AS _c0")
+      ),
+      term("union", "a")
+    )
+
+    util.verifyTable(in, expected)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/eb299576/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/UnionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/UnionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/UnionTest.scala
new file mode 100644
index 0000000..cc2caa1
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/UnionTest.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class UnionTest extends TableTestBase {
+
+  @Test
+  def testUnionAllNullableCompositeType() = {
+    val streamUtil = streamTestUtil()
+    streamUtil.addTable[((Int, String), (Int, String), Int)]("A", 'a, 'b, 'c)
+
+    val expected = binaryNode(
+      "DataStreamUnion",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "a")
+      ),
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "CASE(>(c, 0), b, null) AS EXPR$0")
+      ),
+      term("union all", "a")
+    )
+
+    streamUtil.verifySql(
+      "SELECT a FROM A UNION ALL SELECT CASE WHEN c > 0 THEN b ELSE NULL END FROM A",
+      expected
+    )
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/eb299576/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
index 5a82a8f..9dee194 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.expressions
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.scala.createTypeInformation
 import org.apache.flink.types.Row
 import org.apache.flink.table.api.{Types, ValidationException}
 import org.apache.flink.table.api.scala._
@@ -138,7 +139,6 @@ class ScalarOperatorsTest extends ExpressionTestBase {
 
   @Test
   def testOtherExpressions(): Unit = {
-
     // nested field null type
     testSqlApi("CASE WHEN f13.f1 IS NULL THEN 'a' ELSE 'b' END", "a")
     testSqlApi("CASE WHEN f13.f1 IS NOT NULL THEN 'a' ELSE 'b' END", "b")
@@ -149,6 +149,10 @@ class ScalarOperatorsTest extends ExpressionTestBase {
     testAllApis('f13.get("f1").isNull, "f13.get('f1').isNull", "f13.f1 IS NULL", "true")
     testAllApis('f13.get("f1").isNotNull, "f13.get('f1').isNotNull", "f13.f1 IS NOT NULL", "false")
 
+    // array element access test
+    testSqlApi("CASE WHEN f14 IS NOT NULL THEN f14[1] ELSE NULL END", "1")
+    testSqlApi("CASE WHEN f15 IS NOT NULL THEN f15[1] ELSE NULL END", "(1,a)")
+
     // boolean literals
     testAllApis(
       true,
@@ -250,7 +254,7 @@ class ScalarOperatorsTest extends ExpressionTestBase {
   // ----------------------------------------------------------------------------------------------
 
   def testData = {
-    val testData = new Row(14)
+    val testData = new Row(16)
     testData.setField(0, 1: Byte)
     testData.setField(1, 1: Short)
     testData.setField(2, 1)
@@ -265,6 +269,8 @@ class ScalarOperatorsTest extends ExpressionTestBase {
     testData.setField(11, false)
     testData.setField(12, null)
     testData.setField(13, Row.of("foo", null))
+    testData.setField(14, Array[Integer](1,2))
+    testData.setField(15, Array[(Int, String)]((1,"a"), (2, "b")))
     testData
   }
 
@@ -283,7 +289,9 @@ class ScalarOperatorsTest extends ExpressionTestBase {
       Types.STRING,
       Types.BOOLEAN,
       Types.BOOLEAN,
-      Types.ROW(Types.STRING, Types.STRING)
+      Types.ROW(Types.STRING, Types.STRING),
+      Types.OBJECT_ARRAY(Types.INT),
+      Types.OBJECT_ARRAY(createTypeInformation[(Int, String)])
       ).asInstanceOf[TypeInformation[Any]]
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/eb299576/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala
index e7fb15b..d826127 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala
@@ -25,6 +25,7 @@ import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData}
 import org.apache.flink.table.expressions.utils.{RichFunc1, RichFunc2}
+import org.apache.flink.table.utils.CommonTestData.NonPojo
 import org.apache.flink.table.utils.UserDefinedFunctionTestUtils
 import org.apache.flink.types.Row
 import org.junit.Assert._
@@ -96,9 +97,22 @@ class DataStreamCalcITCase extends StreamingMultipleProgramsTestBase {
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
-  class NonPojo {
-    val x = new java.util.HashMap[String, String]()
+  @Test
+  def testUnionWithCompositeType(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    StreamITCase.testResults = mutable.MutableList()
+    val s1 = env.fromElements((1, (1, "a")), (2, (2, "b")))
+      .toTable(tEnv, 'a, 'b)
+    val s2 = env.fromElements(((3, "c"), 3), ((4, "d"), 4))
+      .toTable(tEnv, 'a, 'b)
 
-    override def toString: String = x.toString
+    val result = s1.unionAll(s2.select('b, 'a)).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = mutable.MutableList("1,(1,a)", "2,(2,b)", "3,(3,c)", "4,(4,d)")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/eb299576/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala
index 6a5c52f..bb4301c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala
@@ -175,4 +175,9 @@ object CommonTestData {
     }
   }
 
+  class NonPojo {
+    val x = new java.util.HashMap[String, String]()
+
+    override def toString: String = x.toString
+  }
 }