You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2018/07/24 11:42:03 UTC
flink git commit: [FLINK-5750] [table] Fix incorrect translation of
n-ary Union.
Repository: flink
Updated Branches:
refs/heads/master 6022225a2 -> 22b322044
[FLINK-5750] [table] Fix incorrect translation of n-ary Union.
In certain cases, Calcite produces union operators with more than two input relations.
However, Flink's translation rules only considered the first two relations
This closes #6341.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/22b32204
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/22b32204
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/22b32204
Branch: refs/heads/master
Commit: 22b3220448f0c2fe43406dbdac2626b67a43b088
Parents: 6022225
Author: Alexander Koltsov <Al...@epam.com>
Authored: Tue Jul 10 16:45:12 2018 +0300
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Jul 24 13:41:19 2018 +0200
----------------------------------------------------------------------
.../flink/table/calcite/FlinkPlannerImpl.scala | 2 -
.../table/plan/nodes/dataset/DataSetUnion.scala | 34 +++++++++--------
.../plan/nodes/datastream/DataStreamUnion.scala | 34 ++++++++++-------
.../plan/rules/dataSet/DataSetUnionRule.scala | 13 +++++--
.../rules/datastream/DataStreamUnionRule.scala | 13 +++++--
.../flink/table/api/ExternalCatalogTest.scala | 5 +++
.../flink/table/api/TableSourceTest.scala | 1 +
.../table/api/batch/sql/GroupingSetsTest.scala | 6 +++
.../table/api/batch/sql/SetOperatorsTest.scala | 39 +++++++++++++++++++-
.../api/batch/table/SetOperatorsTest.scala | 4 ++
.../api/stream/StreamTableEnvironmentTest.scala | 1 +
.../table/api/stream/sql/SetOperatorsTest.scala | 33 +++++++++++++++++
.../flink/table/api/stream/sql/UnionTest.scala | 2 +
.../api/stream/table/SetOperatorsTest.scala | 2 +
.../plan/TimeIndicatorConversionTest.scala | 1 +
.../runtime/batch/sql/SetOperatorsITCase.scala | 16 ++++++++
.../flink/table/utils/TableTestBase.scala | 7 ++++
.../src/test/scala/resources/testUnion0.out | 2 +-
.../src/test/scala/resources/testUnion1.out | 2 +-
.../test/scala/resources/testUnionStream0.out | 2 +-
20 files changed, 176 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/22b32204/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
index d573086..de5b899 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
@@ -19,10 +19,8 @@
package org.apache.flink.table.calcite
import java.util
-import java.util.Properties
import com.google.common.collect.ImmutableList
-import org.apache.calcite.config.{CalciteConnectionConfig, CalciteConnectionConfigImpl, CalciteConnectionProperty}
import org.apache.calcite.jdbc.CalciteSchema
import org.apache.calcite.plan.RelOptTable.ViewExpander
import org.apache.calcite.plan._
http://git-wip-us.apache.org/repos/asf/flink/blob/22b32204/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala
index 1cdd198..bb94f28 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala
@@ -18,12 +18,15 @@
package org.apache.flink.table.plan.nodes.dataset
+import java.util.{List => JList}
+
import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.{SetOp, Union}
import org.apache.calcite.rel.metadata.RelMetadataQuery
-import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rel.{RelNode, RelWriter}
import org.apache.flink.api.java.DataSet
-import org.apache.flink.table.api.{BatchQueryConfig, BatchTableEnvironment}
+import org.apache.flink.table.api.{BatchQueryConfig, BatchTableEnvironment, TableException}
import org.apache.flink.types.Row
import scala.collection.JavaConversions._
@@ -36,22 +39,24 @@ import scala.collection.JavaConverters._
class DataSetUnion(
cluster: RelOptCluster,
traitSet: RelTraitSet,
- leftNode: RelNode,
- rightNode: RelNode,
+ inputs: JList[RelNode],
rowRelDataType: RelDataType)
- extends BiRel(cluster, traitSet, leftNode, rightNode)
+ extends Union(cluster, traitSet, inputs, true)
with DataSetRel {
override def deriveRowType() = rowRelDataType
- override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+ override def copy(traitSet: RelTraitSet, inputs: JList[RelNode], all: Boolean): SetOp = {
+
+ if (!all) {
+ throw new TableException("DataSetUnion only supports UNION ALL.")
+ }
+
new DataSetUnion(
cluster,
traitSet,
- inputs.get(0),
- inputs.get(1),
- rowRelDataType
- )
+ inputs,
+ rowRelDataType)
}
override def toString: String = {
@@ -81,14 +86,13 @@ class DataSetUnion(
tableEnv: BatchTableEnvironment,
queryConfig: BatchQueryConfig): DataSet[Row] = {
- val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, queryConfig)
- val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, queryConfig)
-
- leftDataSet.union(rightDataSet)
+ getInputs
+ .asScala
+ .map(_.asInstanceOf[DataSetRel].translateToPlan(tableEnv, queryConfig))
+ .reduce((dataSetLeft, dataSetRight) => dataSetLeft.union(dataSetRight))
}
private def unionSelectionToString: String = {
rowRelDataType.getFieldNames.asScala.toList.mkString(", ")
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/22b32204/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
index 7258ec8..3355971 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
@@ -18,13 +18,18 @@
package org.apache.flink.table.plan.nodes.datastream
+import java.util.{List => JList}
+
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rel.core.{SetOp, Union}
+import org.apache.calcite.rel.{RelNode, RelWriter}
import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException}
import org.apache.flink.table.plan.schema.RowSchema
import org.apache.flink.table.runtime.types.CRow
+import scala.collection.JavaConverters._
+
/**
* Flink RelNode which matches along with Union.
*
@@ -32,22 +37,24 @@ import org.apache.flink.table.runtime.types.CRow
class DataStreamUnion(
cluster: RelOptCluster,
traitSet: RelTraitSet,
- leftNode: RelNode,
- rightNode: RelNode,
+ inputs: JList[RelNode],
schema: RowSchema)
- extends BiRel(cluster, traitSet, leftNode, rightNode)
+ extends Union(cluster, traitSet, inputs, true)
with DataStreamRel {
override def deriveRowType() = schema.relDataType
- override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+ override def copy(traitSet: RelTraitSet, inputs: JList[RelNode], all: Boolean): SetOp = {
+
+ if (!all) {
+ throw new TableException("DataStreamUnion only supports UNION ALL.")
+ }
+
new DataStreamUnion(
cluster,
traitSet,
- inputs.get(0),
- inputs.get(1),
- schema
- )
+ inputs,
+ schema)
}
override def explainTerms(pw: RelWriter): RelWriter = {
@@ -62,9 +69,10 @@ class DataStreamUnion(
tableEnv: StreamTableEnvironment,
queryConfig: StreamQueryConfig): DataStream[CRow] = {
- val leftDataSet = left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
- val rightDataSet = right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
- leftDataSet.union(rightDataSet)
+ getInputs
+ .asScala
+ .map(_.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig))
+ .reduce((dataSetLeft, dataSetRight) => dataSetLeft.union(dataSetRight))
}
private def unionSelectionToString: String = {
http://git-wip-us.apache.org/repos/asf/flink/blob/22b32204/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetUnionRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetUnionRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetUnionRule.scala
index d4bbb6b..137e59d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetUnionRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetUnionRule.scala
@@ -26,6 +26,8 @@ import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.dataset.DataSetUnion
import org.apache.flink.table.plan.nodes.logical.FlinkLogicalUnion
+import scala.collection.JavaConverters._
+
class DataSetUnionRule
extends ConverterRule(
classOf[FlinkLogicalUnion],
@@ -46,14 +48,17 @@ class DataSetUnionRule
def convert(rel: RelNode): RelNode = {
val union: FlinkLogicalUnion = rel.asInstanceOf[FlinkLogicalUnion]
val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASET)
- val convLeft: RelNode = RelOptRule.convert(union.getInput(0), FlinkConventions.DATASET)
- val convRight: RelNode = RelOptRule.convert(union.getInput(1), FlinkConventions.DATASET)
+
+ val newInputs = union
+ .getInputs
+ .asScala
+ .map(RelOptRule.convert(_, FlinkConventions.DATASET))
+ .asJava
new DataSetUnion(
rel.getCluster,
traitSet,
- convLeft,
- convRight,
+ newInputs,
rel.getRowType)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/22b32204/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamUnionRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamUnionRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamUnionRule.scala
index 8402f6d..6223652 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamUnionRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamUnionRule.scala
@@ -26,6 +26,8 @@ import org.apache.flink.table.plan.nodes.datastream.DataStreamUnion
import org.apache.flink.table.plan.nodes.logical.FlinkLogicalUnion
import org.apache.flink.table.plan.schema.RowSchema
+import scala.collection.JavaConverters._
+
class DataStreamUnionRule
extends ConverterRule(
classOf[FlinkLogicalUnion],
@@ -37,14 +39,17 @@ class DataStreamUnionRule
def convert(rel: RelNode): RelNode = {
val union: FlinkLogicalUnion = rel.asInstanceOf[FlinkLogicalUnion]
val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASTREAM)
- val convLeft: RelNode = RelOptRule.convert(union.getInput(0), FlinkConventions.DATASTREAM)
- val convRight: RelNode = RelOptRule.convert(union.getInput(1), FlinkConventions.DATASTREAM)
+
+ val newInputs = union
+ .getInputs
+ .asScala
+ .map(RelOptRule.convert(_, FlinkConventions.DATASTREAM))
+ .asJava
new DataStreamUnion(
rel.getCluster,
traitSet,
- convLeft,
- convRight,
+ newInputs,
new RowSchema(rel.getRowType))
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/22b32204/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala
index 7f567f9..17f51f3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala
@@ -61,6 +61,7 @@ class ExternalCatalogTest extends TableTestBase {
sourceBatchTableNode(table1Path, table1ProjectedFields),
term("select", "*(a, 2) AS _c0", "b", "UPPER(c) AS _c2")
),
+ term("all", "true"),
term("union", "_c0", "e", "_c2")
)
@@ -90,6 +91,7 @@ class ExternalCatalogTest extends TableTestBase {
sourceBatchTableNode(table1Path, table1ProjectedFields),
term("select", "*(a, 2) AS EXPR$0", "b", "c")
),
+ term("all", "true"),
term("union", "EXPR$0", "e", "g"))
util.verifySql(sqlQuery, expected)
@@ -124,6 +126,7 @@ class ExternalCatalogTest extends TableTestBase {
sourceStreamTableNode(table1Path, table1ProjectedFields),
term("select", "*(a, 2) AS _c0", "b", "UPPER(c) AS _c2")
),
+ term("all", "true"),
term("union all", "_c0", "e", "_c2")
)
@@ -153,6 +156,7 @@ class ExternalCatalogTest extends TableTestBase {
sourceStreamTableNode(table1Path, table1ProjectedFields),
term("select", "*(a, 2) AS EXPR$0", "b", "c")
),
+ term("all", "true"),
term("union all", "EXPR$0", "e", "g"))
util.verifySql(sqlQuery, expected)
@@ -185,6 +189,7 @@ class ExternalCatalogTest extends TableTestBase {
sourceBatchTableNode(table1TopLevelPath, table1ProjectedFields),
term("select", "*(a, 2) AS _c0", "b", "UPPER(c) AS _c2")
),
+ term("all", "true"),
term("union", "_c0", "e", "_c2")
)
http://git-wip-us.apache.org/repos/asf/flink/blob/22b32204/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
index 050f1a1..ee2e749 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
@@ -60,6 +60,7 @@ class TableSourceTest extends TableTestBase {
"table2",
Array("name", "id", "amount", "price"),
"'amount > 2"),
+ term("all", "true"),
term("union", "name, id, amount, price")
)
util.verifyTable(result, expected)
http://git-wip-us.apache.org/repos/asf/flink/blob/22b32204/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupingSetsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupingSetsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupingSetsTest.scala
index 57a4c5a..0bc88c3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupingSetsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupingSetsTest.scala
@@ -56,6 +56,7 @@ class GroupingSetsTest extends TableTestBase {
),
term("select", "null AS b", "c", "a", "2 AS g")
),
+ term("all", "true"),
term("union", "b", "c", "a", "g")
)
@@ -130,12 +131,15 @@ class GroupingSetsTest extends TableTestBase {
"DataSetUnion",
group1,
group2,
+ term("all", "true"),
term("union", "b", "c", "a", "g", "gb", "gc", "gib", "gic", "gid")
),
group3,
+ term("all", "true"),
term("union", "b", "c", "a", "g", "gb", "gc", "gib", "gic", "gid")
),
group4,
+ term("all", "true"),
term("union", "b", "c", "a", "g", "gb", "gc", "gib", "gic", "gid")
)
@@ -195,9 +199,11 @@ class GroupingSetsTest extends TableTestBase {
"DataSetUnion",
group1,
group2,
+ term("all", "true"),
term("union", "b", "c", "a", "g", "gb", "gc", "gib", "gic", "gid")
),
group3,
+ term("all", "true"),
term("union", "b", "c", "a", "g", "gb", "gc", "gib", "gic", "gid")
)
http://git-wip-us.apache.org/repos/asf/flink/blob/22b32204/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/SetOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/SetOperatorsTest.scala
index e6f4a46..a6e47ca 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/SetOperatorsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/SetOperatorsTest.scala
@@ -18,13 +18,13 @@
package org.apache.flink.table.api.batch.sql
-import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo}
+import org.apache.flink.api.java.typeutils.GenericTypeInfo
import org.apache.flink.api.scala._
import org.apache.flink.table.api.Types
import org.apache.flink.table.api.scala._
import org.apache.flink.table.runtime.utils.CommonTestData.NonPojo
-import org.apache.flink.table.utils.TableTestUtil._
import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil._
import org.junit.{Ignore, Test}
class SetOperatorsTest extends TableTestBase {
@@ -190,6 +190,7 @@ class SetOperatorsTest extends TableTestBase {
batchTableNode(0),
term("select", "CASE(>(c, 0), b, null) AS EXPR$0")
),
+ term("all", "true"),
term("union", "a")
)
@@ -219,9 +220,43 @@ class SetOperatorsTest extends TableTestBase {
batchTableNode(0),
term("select", "b")
),
+ term("all", "true"),
term("union", "a")
)
util.verifyJavaSql("SELECT a FROM A UNION ALL SELECT b FROM A", expected)
}
+
+ @Test
+ def testValuesWithCast(): Unit = {
+ val util = batchTestUtil()
+
+ val expected = naryNode(
+ "DataSetUnion",
+ List(
+ unaryNode("DataSetCalc",
+ values("DataSetValues",
+ tuples(List("0")),
+ "values=[ZERO]"),
+ term("select", "1 AS EXPR$0, 1 AS EXPR$1")),
+ unaryNode("DataSetCalc",
+ values("DataSetValues",
+ tuples(List("0")),
+ "values=[ZERO]"),
+ term("select", "2 AS EXPR$0, 2 AS EXPR$1")),
+ unaryNode("DataSetCalc",
+ values("DataSetValues",
+ tuples(List("0")),
+ "values=[ZERO]"),
+ term("select", "3 AS EXPR$0, 3 AS EXPR$1"))
+ ),
+ term("all", "true"),
+ term("union", "EXPR$0, EXPR$1")
+ )
+
+ util.verifySql(
+ "VALUES (1, cast(1 as BIGINT) ),(2, cast(2 as BIGINT)),(3, cast(3 as BIGINT))",
+ expected
+ )
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/22b32204/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
index 929ce9c..9226200 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
@@ -102,6 +102,7 @@ class SetOperatorsTest extends TableTestBase {
batchTableNode(0),
term("select", "CASE(>(c, 0), b, null) AS _c0")
),
+ term("all", "true"),
term("union", "a")
)
@@ -130,6 +131,7 @@ class SetOperatorsTest extends TableTestBase {
batchTableNode(0),
term("select", "b")
),
+ term("all", "true"),
term("union", "a")
)
@@ -165,6 +167,7 @@ class SetOperatorsTest extends TableTestBase {
term("select", "a", "b", "c"),
term("where", ">(a, 0)")
),
+ term("all", "true"),
term("union", "a", "b", "c")
),
term("groupBy", "b"),
@@ -238,6 +241,7 @@ class SetOperatorsTest extends TableTestBase {
batchTableNode(1),
term("select", "b", "c")
),
+ term("all", "true"),
term("union", "b", "c")
)
http://git-wip-us.apache.org/repos/asf/flink/blob/22b32204/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
index 863d07b..30116cd 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
@@ -60,6 +60,7 @@ class StreamTableEnvironmentTest extends TableTestBase {
"DataStreamUnion",
streamTableNode(1),
streamTableNode(0),
+ term("all", "true"),
term("union all", "d, e, f"))
util.verifyTable(sqlTable2, expected2)
http://git-wip-us.apache.org/repos/asf/flink/blob/22b32204/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SetOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SetOperatorsTest.scala
index 266e3ff..97dbe0d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SetOperatorsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SetOperatorsTest.scala
@@ -170,4 +170,37 @@ class SetOperatorsTest extends TableTestBase {
streamUtil.verifySql(sqlQuery, expected)
}
+
+ @Test
+ def testValuesWithCast(): Unit = {
+ val util = batchTestUtil()
+
+ val expected = naryNode(
+ "DataSetUnion",
+ List(
+ unaryNode("DataSetCalc",
+ values("DataSetValues",
+ tuples(List("0")),
+ "values=[ZERO]"),
+ term("select", "1 AS EXPR$0, 1 AS EXPR$1")),
+ unaryNode("DataSetCalc",
+ values("DataSetValues",
+ tuples(List("0")),
+ "values=[ZERO]"),
+ term("select", "2 AS EXPR$0, 2 AS EXPR$1")),
+ unaryNode("DataSetCalc",
+ values("DataSetValues",
+ tuples(List("0")),
+ "values=[ZERO]"),
+ term("select", "3 AS EXPR$0, 3 AS EXPR$1"))
+ ),
+ term("all", "true"),
+ term("union", "EXPR$0, EXPR$1")
+ )
+
+ util.verifySql(
+ "VALUES (1, cast(1 as BIGINT) ),(2, cast(2 as BIGINT)),(3, cast(3 as BIGINT))",
+ expected
+ )
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/22b32204/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/UnionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/UnionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/UnionTest.scala
index 7e807f6..b8bd222 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/UnionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/UnionTest.scala
@@ -46,6 +46,7 @@ class UnionTest extends TableTestBase {
streamTableNode(0),
term("select", "CASE(>(c, 0), b, null) AS EXPR$0")
),
+ term("all", "true"),
term("union all", "a")
)
@@ -75,6 +76,7 @@ class UnionTest extends TableTestBase {
streamTableNode(0),
term("select", "b")
),
+ term("all", "true"),
term("union all", "a")
)
http://git-wip-us.apache.org/repos/asf/flink/blob/22b32204/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala
index e84c630..dfbaf40 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala
@@ -55,6 +55,7 @@ class SetOperatorsTest extends TableTestBase {
term("select", "a", "b", "c"),
term("where", ">(a, 0)")
),
+ term("all", "true"),
term("union all", "a", "b", "c")
),
term("groupBy", "b"),
@@ -88,6 +89,7 @@ class SetOperatorsTest extends TableTestBase {
streamTableNode(1),
term("select", "b", "c")
),
+ term("all", "true"),
term("union all", "b", "c")
)
http://git-wip-us.apache.org/repos/asf/flink/blob/22b32204/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
index faca7f9..6a77f12 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
@@ -221,6 +221,7 @@ class TimeIndicatorConversionTest extends TableTestBase {
streamTableNode(0),
term("select", "rowtime")
),
+ term("all", "true"),
term("union all", "rowtime")
)
http://git-wip-us.apache.org/repos/asf/flink/blob/22b32204/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala
index d965e0c..396310f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala
@@ -123,6 +123,22 @@ class SetOperatorsITCase(
}
@Test
+ def testValuesWithCast(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val sqlQuery = "VALUES (1, cast(1 as BIGINT) )," +
+ "(2, cast(2 as BIGINT))," +
+ "(3, cast(3 as BIGINT))"
+
+ val result = tEnv.sqlQuery(sqlQuery)
+ val results = result.toDataSet[Row].collect()
+
+ val expected = "1,1\n2,2\n3,3"
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
def testExcept(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
http://git-wip-us.apache.org/repos/asf/flink/blob/22b32204/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
index f414dd4..5e1aabe 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
@@ -122,6 +122,13 @@ object TableTestUtil {
|""".stripMargin.stripLineEnd
}
+ def naryNode(node: String, inputs: List[AnyRef], term: String*): String = {
+ val strInputs = inputs.mkString("\n")
+ s"""$node(${term.mkString(", ")})
+ |$strInputs
+ |""".stripMargin.stripLineEnd
+ }
+
def values(node: String, term: String*): String = {
s"$node(${term.mkString(", ")})"
}
http://git-wip-us.apache.org/repos/asf/flink/blob/22b32204/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out b/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out
index 5fbd1b5..b4d12b6 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out
@@ -4,7 +4,7 @@ LogicalUnion(all=[true])
LogicalTableScan(table=[[_DataSetTable_1]])
== Optimized Logical Plan ==
-DataSetUnion(union=[count, word])
+DataSetUnion(all=[true], union=[count, word])
DataSetScan(table=[[_DataSetTable_0]])
DataSetScan(table=[[_DataSetTable_1]])
http://git-wip-us.apache.org/repos/asf/flink/blob/22b32204/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out b/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out
index d7d343b..4822772 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out
@@ -4,7 +4,7 @@ LogicalUnion(all=[true])
LogicalTableScan(table=[[_DataSetTable_1]])
== Optimized Logical Plan ==
-DataSetUnion(union=[count, word])
+DataSetUnion(all=[true], union=[count, word])
DataSetScan(table=[[_DataSetTable_0]])
DataSetScan(table=[[_DataSetTable_1]])
http://git-wip-us.apache.org/repos/asf/flink/blob/22b32204/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out b/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out
index 2d19bdc..af6c691 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out
@@ -4,7 +4,7 @@ LogicalUnion(all=[true])
LogicalTableScan(table=[[_DataStreamTable_1]])
== Optimized Logical Plan ==
-DataStreamUnion(union all=[count, word])
+DataStreamUnion(all=[true], union all=[count, word])
DataStreamScan(table=[[_DataStreamTable_0]])
DataStreamScan(table=[[_DataStreamTable_1]])