You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/09/26 14:50:33 UTC

[1/2] flink git commit: [FLINK-7596] [table] Fix bug during Set Operation (Union, Minus ... ) with Any(GenericRelDataType)

Repository: flink
Updated Branches:
  refs/heads/master 2d393e882 -> 2b82578ab


[FLINK-7596] [table] Fix bug during Set Operation (Union, Minus ... ) with Any(GenericRelDataType)

This closes #4658.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/62ebda3f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/62ebda3f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/62ebda3f

Branch: refs/heads/master
Commit: 62ebda3f99f0d311676e98d6ae1e48c2322a39cb
Parents: 2d393e8
Author: Xpray <le...@gmail.com>
Authored: Fri Sep 15 08:24:17 2017 +0800
Committer: twalthr <tw...@apache.org>
Committed: Tue Sep 26 16:38:33 2017 +0200

----------------------------------------------------------------------
 .../flink/table/calcite/FlinkTypeFactory.scala  | 37 ++++++++++++++++++++
 .../stream/table/SetOperatorsITCase.scala       | 17 +++++++++
 2 files changed, 54 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/62ebda3f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index 637e8cc..449b198 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -18,6 +18,9 @@
 
 package org.apache.flink.table.calcite
 
+import java.util
+import java.util.List
+
 import org.apache.calcite.avatica.util.TimeUnit
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl
 import org.apache.calcite.rel.`type`._
@@ -244,6 +247,40 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
 
     canonize(newType)
   }
+
+  private def resolveAnySqlType(types: java.util.List[RelDataType]): RelDataType = {
+    val hasAny = types.asScala.map(_.getSqlTypeName).exists(_ == SqlTypeName.ANY)
+    val nullable = types.asScala.exists(
+      sqlType => sqlType.isNullable || sqlType.getSqlTypeName == SqlTypeName.NULL
+    )
+    if (hasAny) {
+      if (types.get(0).isInstanceOf[GenericRelDataType] &&
+        types.get(1).isInstanceOf[GenericRelDataType]) {
+        createTypeWithNullability(types.get(0), nullable)
+      } else {
+        throw new RuntimeException("only GenericRelDataType of ANY is supported")
+      }
+    } else {
+      null
+    }
+  }
+
+  override def leastRestrictive(types: util.List[RelDataType]): RelDataType = {
+    assert(types != null)
+    assert(types.size >= 1)
+    val type0 = types.get(0)
+    if (type0.getSqlTypeName != null) {
+      val resultType = resolveAnySqlType(types)
+      if (resultType != null) {
+        resultType
+      } else {
+        super.leastRestrictive(types)
+      }
+    } else {
+      super.leastRestrictive(types)
+    }
+  }
+
 }
 
 object FlinkTypeFactory {

http://git-wip-us.apache.org/repos/asf/flink/blob/62ebda3f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
index 688849e..2333c92 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
@@ -70,4 +70,21 @@ class SetOperatorsITCase extends StreamingMultipleProgramsTestBase {
     val expected = mutable.MutableList("Hi", "Hallo")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
+
+  @Test
+  def testUnionWithAnyType(): Unit = {
+    val list = List((1, new NODE), (2, new NODE))
+    val list2 = List((3, new NODE), (4, new NODE))
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val s1 = tEnv.fromDataStream(env.fromCollection(list))
+    val s2 = tEnv.fromDataStream(env.fromCollection(list2))
+    val result = s1.unionAll(s2).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+  }
+
+  class NODE {
+    val x = new java.util.HashMap[String, String]()
+  }
 }


[2/2] flink git commit: [FLINK-7596] [table] Restrict equality and improve tests

Posted by tw...@apache.org.
[FLINK-7596] [table] Restrict equality and improve tests


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2b82578a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2b82578a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2b82578a

Branch: refs/heads/master
Commit: 2b82578abec20eb97bc0d641ca19977cc3805c9e
Parents: 62ebda3
Author: twalthr <tw...@apache.org>
Authored: Tue Sep 26 16:37:31 2017 +0200
Committer: twalthr <tw...@apache.org>
Committed: Tue Sep 26 16:49:10 2017 +0200

----------------------------------------------------------------------
 .../flink/table/calcite/FlinkTypeFactory.scala  | 45 +++++++++-----------
 .../stream/table/SetOperatorsITCase.scala       | 16 ++++---
 2 files changed, 31 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2b82578a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index 449b198..1cc9f6b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -19,7 +19,6 @@
 package org.apache.flink.table.calcite
 
 import java.util
-import java.util.List
 
 import org.apache.calcite.avatica.util.TimeUnit
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl
@@ -248,39 +247,35 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
     canonize(newType)
   }
 
-  private def resolveAnySqlType(types: java.util.List[RelDataType]): RelDataType = {
-    val hasAny = types.asScala.map(_.getSqlTypeName).exists(_ == SqlTypeName.ANY)
-    val nullable = types.asScala.exists(
-      sqlType => sqlType.isNullable || sqlType.getSqlTypeName == SqlTypeName.NULL
-    )
-    if (hasAny) {
-      if (types.get(0).isInstanceOf[GenericRelDataType] &&
-        types.get(1).isInstanceOf[GenericRelDataType]) {
-        createTypeWithNullability(types.get(0), nullable)
-      } else {
-        throw new RuntimeException("only GenericRelDataType of ANY is supported")
-      }
-    } else {
-      null
-    }
-  }
-
   override def leastRestrictive(types: util.List[RelDataType]): RelDataType = {
-    assert(types != null)
-    assert(types.size >= 1)
     val type0 = types.get(0)
     if (type0.getSqlTypeName != null) {
-      val resultType = resolveAnySqlType(types)
+      val resultType = resolveAny(types)
       if (resultType != null) {
-        resultType
+        return resultType
+      }
+    }
+    super.leastRestrictive(types)
+  }
+
+  private def resolveAny(types: util.List[RelDataType]): RelDataType = {
+    val allTypes = types.asScala
+    val hasAny = allTypes.exists(_.getSqlTypeName == SqlTypeName.ANY)
+    if (hasAny) {
+      val head = allTypes.head
+      // only allow ANY with exactly the same GenericRelDataType for all types
+      if (allTypes.forall(_ == head)) {
+        val nullable = allTypes.exists(
+          sqlType => sqlType.isNullable || sqlType.getSqlTypeName == SqlTypeName.NULL
+        )
+        createTypeWithNullability(head, nullable)
       } else {
-        super.leastRestrictive(types)
+        throw TableException("Generic ANY types must have a common type information.")
       }
     } else {
-      super.leastRestrictive(types)
+      null
     }
   }
-
 }
 
 object FlinkTypeFactory {

http://git-wip-us.apache.org/repos/asf/flink/blob/2b82578a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
index 2333c92..cf195a5 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
@@ -73,18 +73,24 @@ class SetOperatorsITCase extends StreamingMultipleProgramsTestBase {
 
   @Test
   def testUnionWithAnyType(): Unit = {
-    val list = List((1, new NODE), (2, new NODE))
-    val list2 = List((3, new NODE), (4, new NODE))
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)
-    val s1 = tEnv.fromDataStream(env.fromCollection(list))
-    val s2 = tEnv.fromDataStream(env.fromCollection(list2))
+
+    StreamITCase.testResults = mutable.MutableList()
+    val s1 = env.fromElements((1, new NonPojo), (2, new NonPojo)).toTable(tEnv, 'a, 'b)
+    val s2 = env.fromElements((3, new NonPojo), (4, new NonPojo)).toTable(tEnv, 'a, 'b)
+
     val result = s1.unionAll(s2).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
+
+    val expected = mutable.MutableList("1,{}", "2,{}", "3,{}", "4,{}")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
-  class NODE {
+  class NonPojo {
     val x = new java.util.HashMap[String, String]()
+
+    override def toString: String = x.toString
   }
 }