You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/09/26 14:50:33 UTC
[1/2] flink git commit: [FLINK-7596] [table] Fix bug during Set
Operation (Union, Minus ... ) with Any(GenericRelDataType)
Repository: flink
Updated Branches:
refs/heads/master 2d393e882 -> 2b82578ab
[FLINK-7596] [table] Fix bug during Set Operation (Union, Minus ... ) with Any(GenericRelDataType)
This closes #4658.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/62ebda3f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/62ebda3f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/62ebda3f
Branch: refs/heads/master
Commit: 62ebda3f99f0d311676e98d6ae1e48c2322a39cb
Parents: 2d393e8
Author: Xpray <le...@gmail.com>
Authored: Fri Sep 15 08:24:17 2017 +0800
Committer: twalthr <tw...@apache.org>
Committed: Tue Sep 26 16:38:33 2017 +0200
----------------------------------------------------------------------
.../flink/table/calcite/FlinkTypeFactory.scala | 37 ++++++++++++++++++++
.../stream/table/SetOperatorsITCase.scala | 17 +++++++++
2 files changed, 54 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/62ebda3f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index 637e8cc..449b198 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -18,6 +18,9 @@
package org.apache.flink.table.calcite
+import java.util
+import java.util.List
+
import org.apache.calcite.avatica.util.TimeUnit
import org.apache.calcite.jdbc.JavaTypeFactoryImpl
import org.apache.calcite.rel.`type`._
@@ -244,6 +247,40 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
canonize(newType)
}
+
+ private def resolveAnySqlType(types: java.util.List[RelDataType]): RelDataType = {
+ val hasAny = types.asScala.map(_.getSqlTypeName).exists(_ == SqlTypeName.ANY)
+ val nullable = types.asScala.exists(
+ sqlType => sqlType.isNullable || sqlType.getSqlTypeName == SqlTypeName.NULL
+ )
+ if (hasAny) {
+ if (types.get(0).isInstanceOf[GenericRelDataType] &&
+ types.get(1).isInstanceOf[GenericRelDataType]) {
+ createTypeWithNullability(types.get(0), nullable)
+ } else {
+ throw new RuntimeException("only GenericRelDataType of ANY is supported")
+ }
+ } else {
+ null
+ }
+ }
+
+ override def leastRestrictive(types: util.List[RelDataType]): RelDataType = {
+ assert(types != null)
+ assert(types.size >= 1)
+ val type0 = types.get(0)
+ if (type0.getSqlTypeName != null) {
+ val resultType = resolveAnySqlType(types)
+ if (resultType != null) {
+ resultType
+ } else {
+ super.leastRestrictive(types)
+ }
+ } else {
+ super.leastRestrictive(types)
+ }
+ }
+
}
object FlinkTypeFactory {
http://git-wip-us.apache.org/repos/asf/flink/blob/62ebda3f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
index 688849e..2333c92 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
@@ -70,4 +70,21 @@ class SetOperatorsITCase extends StreamingMultipleProgramsTestBase {
val expected = mutable.MutableList("Hi", "Hallo")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
+
+ @Test
+ def testUnionWithAnyType(): Unit = {
+ val list = List((1, new NODE), (2, new NODE))
+ val list2 = List((3, new NODE), (4, new NODE))
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ val s1 = tEnv.fromDataStream(env.fromCollection(list))
+ val s2 = tEnv.fromDataStream(env.fromCollection(list2))
+ val result = s1.unionAll(s2).toAppendStream[Row]
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+ }
+
+ class NODE {
+ val x = new java.util.HashMap[String, String]()
+ }
}
[2/2] flink git commit: [FLINK-7596] [table] Restrict equality and
improve tests
Posted by tw...@apache.org.
[FLINK-7596] [table] Restrict equality and improve tests
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2b82578a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2b82578a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2b82578a
Branch: refs/heads/master
Commit: 2b82578abec20eb97bc0d641ca19977cc3805c9e
Parents: 62ebda3
Author: twalthr <tw...@apache.org>
Authored: Tue Sep 26 16:37:31 2017 +0200
Committer: twalthr <tw...@apache.org>
Committed: Tue Sep 26 16:49:10 2017 +0200
----------------------------------------------------------------------
.../flink/table/calcite/FlinkTypeFactory.scala | 45 +++++++++-----------
.../stream/table/SetOperatorsITCase.scala | 16 ++++---
2 files changed, 31 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2b82578a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index 449b198..1cc9f6b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -19,7 +19,6 @@
package org.apache.flink.table.calcite
import java.util
-import java.util.List
import org.apache.calcite.avatica.util.TimeUnit
import org.apache.calcite.jdbc.JavaTypeFactoryImpl
@@ -248,39 +247,35 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
canonize(newType)
}
- private def resolveAnySqlType(types: java.util.List[RelDataType]): RelDataType = {
- val hasAny = types.asScala.map(_.getSqlTypeName).exists(_ == SqlTypeName.ANY)
- val nullable = types.asScala.exists(
- sqlType => sqlType.isNullable || sqlType.getSqlTypeName == SqlTypeName.NULL
- )
- if (hasAny) {
- if (types.get(0).isInstanceOf[GenericRelDataType] &&
- types.get(1).isInstanceOf[GenericRelDataType]) {
- createTypeWithNullability(types.get(0), nullable)
- } else {
- throw new RuntimeException("only GenericRelDataType of ANY is supported")
- }
- } else {
- null
- }
- }
-
override def leastRestrictive(types: util.List[RelDataType]): RelDataType = {
- assert(types != null)
- assert(types.size >= 1)
val type0 = types.get(0)
if (type0.getSqlTypeName != null) {
- val resultType = resolveAnySqlType(types)
+ val resultType = resolveAny(types)
if (resultType != null) {
- resultType
+ return resultType
+ }
+ }
+ super.leastRestrictive(types)
+ }
+
+ private def resolveAny(types: util.List[RelDataType]): RelDataType = {
+ val allTypes = types.asScala
+ val hasAny = allTypes.exists(_.getSqlTypeName == SqlTypeName.ANY)
+ if (hasAny) {
+ val head = allTypes.head
+ // only allow ANY with exactly the same GenericRelDataType for all types
+ if (allTypes.forall(_ == head)) {
+ val nullable = allTypes.exists(
+ sqlType => sqlType.isNullable || sqlType.getSqlTypeName == SqlTypeName.NULL
+ )
+ createTypeWithNullability(head, nullable)
} else {
- super.leastRestrictive(types)
+ throw TableException("Generic ANY types must have a common type information.")
}
} else {
- super.leastRestrictive(types)
+ null
}
}
-
}
object FlinkTypeFactory {
http://git-wip-us.apache.org/repos/asf/flink/blob/2b82578a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
index 2333c92..cf195a5 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
@@ -73,18 +73,24 @@ class SetOperatorsITCase extends StreamingMultipleProgramsTestBase {
@Test
def testUnionWithAnyType(): Unit = {
- val list = List((1, new NODE), (2, new NODE))
- val list2 = List((3, new NODE), (4, new NODE))
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
- val s1 = tEnv.fromDataStream(env.fromCollection(list))
- val s2 = tEnv.fromDataStream(env.fromCollection(list2))
+
+ StreamITCase.testResults = mutable.MutableList()
+ val s1 = env.fromElements((1, new NonPojo), (2, new NonPojo)).toTable(tEnv, 'a, 'b)
+ val s2 = env.fromElements((3, new NonPojo), (4, new NonPojo)).toTable(tEnv, 'a, 'b)
+
val result = s1.unionAll(s2).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
+
+ val expected = mutable.MutableList("1,{}", "2,{}", "3,{}", "4,{}")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
- class NODE {
+ class NonPojo {
val x = new java.util.HashMap[String, String]()
+
+ override def toString: String = x.toString
}
}