You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2018/07/24 11:42:03 UTC

flink git commit: [FLINK-5750] [table] Fix incorrect translation of n-ary Union.

Repository: flink
Updated Branches:
  refs/heads/master 6022225a2 -> 22b322044


[FLINK-5750] [table] Fix incorrect translation of n-ary Union.

In certain cases, Calcite produces union operators with more than two input relations.
However, Flink's translation rules only considered the first two relations

This closes #6341.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/22b32204
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/22b32204
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/22b32204

Branch: refs/heads/master
Commit: 22b3220448f0c2fe43406dbdac2626b67a43b088
Parents: 6022225
Author: Alexander Koltsov <Al...@epam.com>
Authored: Tue Jul 10 16:45:12 2018 +0300
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Jul 24 13:41:19 2018 +0200

----------------------------------------------------------------------
 .../flink/table/calcite/FlinkPlannerImpl.scala  |  2 -
 .../table/plan/nodes/dataset/DataSetUnion.scala | 34 +++++++++--------
 .../plan/nodes/datastream/DataStreamUnion.scala | 34 ++++++++++-------
 .../plan/rules/dataSet/DataSetUnionRule.scala   | 13 +++++--
 .../rules/datastream/DataStreamUnionRule.scala  | 13 +++++--
 .../flink/table/api/ExternalCatalogTest.scala   |  5 +++
 .../flink/table/api/TableSourceTest.scala       |  1 +
 .../table/api/batch/sql/GroupingSetsTest.scala  |  6 +++
 .../table/api/batch/sql/SetOperatorsTest.scala  | 39 +++++++++++++++++++-
 .../api/batch/table/SetOperatorsTest.scala      |  4 ++
 .../api/stream/StreamTableEnvironmentTest.scala |  1 +
 .../table/api/stream/sql/SetOperatorsTest.scala | 33 +++++++++++++++++
 .../flink/table/api/stream/sql/UnionTest.scala  |  2 +
 .../api/stream/table/SetOperatorsTest.scala     |  2 +
 .../plan/TimeIndicatorConversionTest.scala      |  1 +
 .../runtime/batch/sql/SetOperatorsITCase.scala  | 16 ++++++++
 .../flink/table/utils/TableTestBase.scala       |  7 ++++
 .../src/test/scala/resources/testUnion0.out     |  2 +-
 .../src/test/scala/resources/testUnion1.out     |  2 +-
 .../test/scala/resources/testUnionStream0.out   |  2 +-
 20 files changed, 176 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/22b32204/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
index d573086..de5b899 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
@@ -19,10 +19,8 @@
 package org.apache.flink.table.calcite
 
 import java.util
-import java.util.Properties
 
 import com.google.common.collect.ImmutableList
-import org.apache.calcite.config.{CalciteConnectionConfig, CalciteConnectionConfigImpl, CalciteConnectionProperty}
 import org.apache.calcite.jdbc.CalciteSchema
 import org.apache.calcite.plan.RelOptTable.ViewExpander
 import org.apache.calcite.plan._

http://git-wip-us.apache.org/repos/asf/flink/blob/22b32204/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala
index 1cdd198..bb94f28 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala
@@ -18,12 +18,15 @@
 
 package org.apache.flink.table.plan.nodes.dataset
 
+import java.util.{List => JList}
+
 import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.{SetOp, Union}
 import org.apache.calcite.rel.metadata.RelMetadataQuery
-import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rel.{RelNode, RelWriter}
 import org.apache.flink.api.java.DataSet
-import org.apache.flink.table.api.{BatchQueryConfig, BatchTableEnvironment}
+import org.apache.flink.table.api.{BatchQueryConfig, BatchTableEnvironment, TableException}
 import org.apache.flink.types.Row
 
 import scala.collection.JavaConversions._
@@ -36,22 +39,24 @@ import scala.collection.JavaConverters._
 class DataSetUnion(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
-    leftNode: RelNode,
-    rightNode: RelNode,
+    inputs: JList[RelNode],
     rowRelDataType: RelDataType)
-  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  extends Union(cluster, traitSet, inputs, true)
   with DataSetRel {
 
   override def deriveRowType() = rowRelDataType
 
-  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+  override def copy(traitSet: RelTraitSet, inputs: JList[RelNode], all: Boolean): SetOp = {
+
+    if (!all) {
+      throw new TableException("DataSetUnion only supports UNION ALL.")
+    }
+
     new DataSetUnion(
       cluster,
       traitSet,
-      inputs.get(0),
-      inputs.get(1),
-      rowRelDataType
-    )
+      inputs,
+      rowRelDataType)
   }
 
   override def toString: String = {
@@ -81,14 +86,13 @@ class DataSetUnion(
       tableEnv: BatchTableEnvironment,
       queryConfig: BatchQueryConfig): DataSet[Row] = {
 
-    val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, queryConfig)
-    val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, queryConfig)
-
-    leftDataSet.union(rightDataSet)
+    getInputs
+      .asScala
+      .map(_.asInstanceOf[DataSetRel].translateToPlan(tableEnv, queryConfig))
+      .reduce((dataSetLeft, dataSetRight) => dataSetLeft.union(dataSetRight))
   }
 
   private def unionSelectionToString: String = {
     rowRelDataType.getFieldNames.asScala.toList.mkString(", ")
   }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/22b32204/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
index 7258ec8..3355971 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
@@ -18,13 +18,18 @@
 
 package org.apache.flink.table.plan.nodes.datastream
 
+import java.util.{List => JList}
+
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rel.core.{SetOp, Union}
+import org.apache.calcite.rel.{RelNode, RelWriter}
 import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException}
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.runtime.types.CRow
 
+import scala.collection.JavaConverters._
+
 /**
   * Flink RelNode which matches along with Union.
   *
@@ -32,22 +37,24 @@ import org.apache.flink.table.runtime.types.CRow
 class DataStreamUnion(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
-    leftNode: RelNode,
-    rightNode: RelNode,
+    inputs: JList[RelNode],
     schema: RowSchema)
-  extends BiRel(cluster, traitSet, leftNode, rightNode)
+  extends Union(cluster, traitSet, inputs, true)
   with DataStreamRel {
 
   override def deriveRowType() = schema.relDataType
 
-  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+  override def copy(traitSet: RelTraitSet, inputs: JList[RelNode], all: Boolean): SetOp = {
+
+    if (!all) {
+      throw new TableException("DataStreamUnion only supports UNION ALL.")
+    }
+
     new DataStreamUnion(
       cluster,
       traitSet,
-      inputs.get(0),
-      inputs.get(1),
-      schema
-    )
+      inputs,
+      schema)
   }
 
   override def explainTerms(pw: RelWriter): RelWriter = {
@@ -62,9 +69,10 @@ class DataStreamUnion(
       tableEnv: StreamTableEnvironment,
       queryConfig: StreamQueryConfig): DataStream[CRow] = {
 
-    val leftDataSet = left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
-    val rightDataSet = right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
-    leftDataSet.union(rightDataSet)
+    getInputs
+      .asScala
+      .map(_.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig))
+      .reduce((dataSetLeft, dataSetRight) => dataSetLeft.union(dataSetRight))
   }
 
   private def unionSelectionToString: String = {

http://git-wip-us.apache.org/repos/asf/flink/blob/22b32204/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetUnionRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetUnionRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetUnionRule.scala
index d4bbb6b..137e59d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetUnionRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetUnionRule.scala
@@ -26,6 +26,8 @@ import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.dataset.DataSetUnion
 import org.apache.flink.table.plan.nodes.logical.FlinkLogicalUnion
 
+import scala.collection.JavaConverters._
+
 class DataSetUnionRule
   extends ConverterRule(
     classOf[FlinkLogicalUnion],
@@ -46,14 +48,17 @@ class DataSetUnionRule
   def convert(rel: RelNode): RelNode = {
     val union: FlinkLogicalUnion = rel.asInstanceOf[FlinkLogicalUnion]
     val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASET)
-    val convLeft: RelNode = RelOptRule.convert(union.getInput(0), FlinkConventions.DATASET)
-    val convRight: RelNode = RelOptRule.convert(union.getInput(1), FlinkConventions.DATASET)
+
+    val newInputs = union
+      .getInputs
+      .asScala
+      .map(RelOptRule.convert(_, FlinkConventions.DATASET))
+      .asJava
 
     new DataSetUnion(
       rel.getCluster,
       traitSet,
-      convLeft,
-      convRight,
+      newInputs,
       rel.getRowType)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/22b32204/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamUnionRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamUnionRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamUnionRule.scala
index 8402f6d..6223652 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamUnionRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamUnionRule.scala
@@ -26,6 +26,8 @@ import org.apache.flink.table.plan.nodes.datastream.DataStreamUnion
 import org.apache.flink.table.plan.nodes.logical.FlinkLogicalUnion
 import org.apache.flink.table.plan.schema.RowSchema
 
+import scala.collection.JavaConverters._
+
 class DataStreamUnionRule
   extends ConverterRule(
     classOf[FlinkLogicalUnion],
@@ -37,14 +39,17 @@ class DataStreamUnionRule
   def convert(rel: RelNode): RelNode = {
     val union: FlinkLogicalUnion = rel.asInstanceOf[FlinkLogicalUnion]
     val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASTREAM)
-    val convLeft: RelNode = RelOptRule.convert(union.getInput(0), FlinkConventions.DATASTREAM)
-    val convRight: RelNode = RelOptRule.convert(union.getInput(1), FlinkConventions.DATASTREAM)
+
+    val newInputs = union
+      .getInputs
+      .asScala
+      .map(RelOptRule.convert(_, FlinkConventions.DATASTREAM))
+      .asJava
 
     new DataStreamUnion(
       rel.getCluster,
       traitSet,
-      convLeft,
-      convRight,
+      newInputs,
       new RowSchema(rel.getRowType))
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/22b32204/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala
index 7f567f9..17f51f3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala
@@ -61,6 +61,7 @@ class ExternalCatalogTest extends TableTestBase {
         sourceBatchTableNode(table1Path, table1ProjectedFields),
         term("select", "*(a, 2) AS _c0", "b", "UPPER(c) AS _c2")
       ),
+      term("all", "true"),
       term("union", "_c0", "e", "_c2")
     )
 
@@ -90,6 +91,7 @@ class ExternalCatalogTest extends TableTestBase {
         sourceBatchTableNode(table1Path, table1ProjectedFields),
         term("select", "*(a, 2) AS EXPR$0", "b", "c")
       ),
+      term("all", "true"),
       term("union", "EXPR$0", "e", "g"))
 
     util.verifySql(sqlQuery, expected)
@@ -124,6 +126,7 @@ class ExternalCatalogTest extends TableTestBase {
         sourceStreamTableNode(table1Path, table1ProjectedFields),
         term("select", "*(a, 2) AS _c0", "b", "UPPER(c) AS _c2")
       ),
+      term("all", "true"),
       term("union all", "_c0", "e", "_c2")
     )
 
@@ -153,6 +156,7 @@ class ExternalCatalogTest extends TableTestBase {
         sourceStreamTableNode(table1Path, table1ProjectedFields),
         term("select", "*(a, 2) AS EXPR$0", "b", "c")
       ),
+      term("all", "true"),
       term("union all", "EXPR$0", "e", "g"))
 
     util.verifySql(sqlQuery, expected)
@@ -185,6 +189,7 @@ class ExternalCatalogTest extends TableTestBase {
         sourceBatchTableNode(table1TopLevelPath, table1ProjectedFields),
         term("select", "*(a, 2) AS _c0", "b", "UPPER(c) AS _c2")
       ),
+      term("all", "true"),
       term("union", "_c0", "e", "_c2")
     )
 

http://git-wip-us.apache.org/repos/asf/flink/blob/22b32204/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
index 050f1a1..ee2e749 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
@@ -60,6 +60,7 @@ class TableSourceTest extends TableTestBase {
         "table2",
         Array("name", "id", "amount", "price"),
         "'amount > 2"),
+      term("all", "true"),
       term("union", "name, id, amount, price")
     )
     util.verifyTable(result, expected)

http://git-wip-us.apache.org/repos/asf/flink/blob/22b32204/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupingSetsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupingSetsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupingSetsTest.scala
index 57a4c5a..0bc88c3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupingSetsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupingSetsTest.scala
@@ -56,6 +56,7 @@ class GroupingSetsTest extends TableTestBase {
         ),
         term("select", "null AS b", "c", "a", "2 AS g")
       ),
+      term("all", "true"),
       term("union", "b", "c", "a", "g")
     )
 
@@ -130,12 +131,15 @@ class GroupingSetsTest extends TableTestBase {
           "DataSetUnion",
           group1,
           group2,
+          term("all", "true"),
           term("union", "b", "c", "a", "g", "gb", "gc", "gib", "gic", "gid")
         ),
         group3,
+        term("all", "true"),
         term("union", "b", "c", "a", "g", "gb", "gc", "gib", "gic", "gid")
       ),
       group4,
+      term("all", "true"),
       term("union", "b", "c", "a", "g", "gb", "gc", "gib", "gic", "gid")
     )
 
@@ -195,9 +199,11 @@ class GroupingSetsTest extends TableTestBase {
         "DataSetUnion",
         group1,
         group2,
+        term("all", "true"),
         term("union", "b", "c", "a", "g", "gb", "gc", "gib", "gic", "gid")
       ),
       group3,
+      term("all", "true"),
       term("union", "b", "c", "a", "g", "gb", "gc", "gib", "gic", "gid")
     )
 

http://git-wip-us.apache.org/repos/asf/flink/blob/22b32204/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/SetOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/SetOperatorsTest.scala
index e6f4a46..a6e47ca 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/SetOperatorsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/SetOperatorsTest.scala
@@ -18,13 +18,13 @@
 
 package org.apache.flink.table.api.batch.sql
 
-import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo}
+import org.apache.flink.api.java.typeutils.GenericTypeInfo
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.Types
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.runtime.utils.CommonTestData.NonPojo
-import org.apache.flink.table.utils.TableTestUtil._
 import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil._
 import org.junit.{Ignore, Test}
 
 class SetOperatorsTest extends TableTestBase {
@@ -190,6 +190,7 @@ class SetOperatorsTest extends TableTestBase {
         batchTableNode(0),
         term("select", "CASE(>(c, 0), b, null) AS EXPR$0")
       ),
+      term("all", "true"),
       term("union", "a")
     )
 
@@ -219,9 +220,43 @@ class SetOperatorsTest extends TableTestBase {
         batchTableNode(0),
         term("select", "b")
       ),
+      term("all", "true"),
       term("union", "a")
     )
 
     util.verifyJavaSql("SELECT a FROM A UNION ALL SELECT b FROM A", expected)
   }
+
+  @Test
+  def testValuesWithCast(): Unit = {
+    val util = batchTestUtil()
+
+    val expected = naryNode(
+      "DataSetUnion",
+      List(
+        unaryNode("DataSetCalc",
+          values("DataSetValues",
+            tuples(List("0")),
+            "values=[ZERO]"),
+          term("select", "1 AS EXPR$0, 1 AS EXPR$1")),
+        unaryNode("DataSetCalc",
+          values("DataSetValues",
+            tuples(List("0")),
+            "values=[ZERO]"),
+          term("select", "2 AS EXPR$0, 2 AS EXPR$1")),
+        unaryNode("DataSetCalc",
+          values("DataSetValues",
+            tuples(List("0")),
+            "values=[ZERO]"),
+          term("select", "3 AS EXPR$0, 3 AS EXPR$1"))
+      ),
+      term("all", "true"),
+      term("union", "EXPR$0, EXPR$1")
+    )
+
+    util.verifySql(
+      "VALUES (1, cast(1 as BIGINT) ),(2, cast(2 as BIGINT)),(3, cast(3 as BIGINT))",
+      expected
+    )
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/22b32204/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
index 929ce9c..9226200 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
@@ -102,6 +102,7 @@ class SetOperatorsTest extends TableTestBase {
         batchTableNode(0),
         term("select", "CASE(>(c, 0), b, null) AS _c0")
       ),
+      term("all", "true"),
       term("union", "a")
     )
 
@@ -130,6 +131,7 @@ class SetOperatorsTest extends TableTestBase {
         batchTableNode(0),
         term("select", "b")
       ),
+      term("all", "true"),
       term("union", "a")
     )
 
@@ -165,6 +167,7 @@ class SetOperatorsTest extends TableTestBase {
             term("select", "a", "b", "c"),
             term("where", ">(a, 0)")
           ),
+          term("all", "true"),
           term("union", "a", "b", "c")
         ),
         term("groupBy", "b"),
@@ -238,6 +241,7 @@ class SetOperatorsTest extends TableTestBase {
         batchTableNode(1),
         term("select", "b", "c")
       ),
+      term("all", "true"),
       term("union", "b", "c")
     )
 

http://git-wip-us.apache.org/repos/asf/flink/blob/22b32204/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
index 863d07b..30116cd 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
@@ -60,6 +60,7 @@ class StreamTableEnvironmentTest extends TableTestBase {
       "DataStreamUnion",
       streamTableNode(1),
       streamTableNode(0),
+      term("all", "true"),
       term("union all", "d, e, f"))
 
     util.verifyTable(sqlTable2, expected2)

http://git-wip-us.apache.org/repos/asf/flink/blob/22b32204/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SetOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SetOperatorsTest.scala
index 266e3ff..97dbe0d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SetOperatorsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SetOperatorsTest.scala
@@ -170,4 +170,37 @@ class SetOperatorsTest extends TableTestBase {
 
     streamUtil.verifySql(sqlQuery, expected)
   }
+
+  @Test
+  def testValuesWithCast(): Unit = {
+    val util = batchTestUtil()
+
+    val expected = naryNode(
+      "DataSetUnion",
+      List(
+        unaryNode("DataSetCalc",
+          values("DataSetValues",
+            tuples(List("0")),
+            "values=[ZERO]"),
+          term("select", "1 AS EXPR$0, 1 AS EXPR$1")),
+        unaryNode("DataSetCalc",
+          values("DataSetValues",
+            tuples(List("0")),
+            "values=[ZERO]"),
+          term("select", "2 AS EXPR$0, 2 AS EXPR$1")),
+        unaryNode("DataSetCalc",
+          values("DataSetValues",
+            tuples(List("0")),
+            "values=[ZERO]"),
+          term("select", "3 AS EXPR$0, 3 AS EXPR$1"))
+      ),
+      term("all", "true"),
+      term("union", "EXPR$0, EXPR$1")
+    )
+
+    util.verifySql(
+      "VALUES (1, cast(1 as BIGINT) ),(2, cast(2 as BIGINT)),(3, cast(3 as BIGINT))",
+      expected
+    )
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/22b32204/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/UnionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/UnionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/UnionTest.scala
index 7e807f6..b8bd222 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/UnionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/UnionTest.scala
@@ -46,6 +46,7 @@ class UnionTest extends TableTestBase {
         streamTableNode(0),
         term("select", "CASE(>(c, 0), b, null) AS EXPR$0")
       ),
+      term("all", "true"),
       term("union all", "a")
     )
 
@@ -75,6 +76,7 @@ class UnionTest extends TableTestBase {
         streamTableNode(0),
         term("select", "b")
       ),
+      term("all", "true"),
       term("union all", "a")
     )
 

http://git-wip-us.apache.org/repos/asf/flink/blob/22b32204/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala
index e84c630..dfbaf40 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala
@@ -55,6 +55,7 @@ class SetOperatorsTest extends TableTestBase {
               term("select", "a", "b", "c"),
               term("where", ">(a, 0)")
             ),
+            term("all", "true"),
             term("union all", "a", "b", "c")
           ),
           term("groupBy", "b"),
@@ -88,6 +89,7 @@ class SetOperatorsTest extends TableTestBase {
         streamTableNode(1),
         term("select", "b", "c")
       ),
+      term("all", "true"),
       term("union all", "b", "c")
     )
 

http://git-wip-us.apache.org/repos/asf/flink/blob/22b32204/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
index faca7f9..6a77f12 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
@@ -221,6 +221,7 @@ class TimeIndicatorConversionTest extends TableTestBase {
         streamTableNode(0),
         term("select", "rowtime")
       ),
+      term("all", "true"),
       term("union all", "rowtime")
     )
 

http://git-wip-us.apache.org/repos/asf/flink/blob/22b32204/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala
index d965e0c..396310f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala
@@ -123,6 +123,22 @@ class SetOperatorsITCase(
   }
 
   @Test
+  def testValuesWithCast(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = "VALUES (1, cast(1 as BIGINT) )," +
+      "(2, cast(2 as BIGINT))," +
+      "(3, cast(3 as BIGINT))"
+
+    val result = tEnv.sqlQuery(sqlQuery)
+    val results = result.toDataSet[Row].collect()
+
+    val expected = "1,1\n2,2\n3,3"
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
   def testExcept(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment

http://git-wip-us.apache.org/repos/asf/flink/blob/22b32204/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
index f414dd4..5e1aabe 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
@@ -122,6 +122,13 @@ object TableTestUtil {
        |""".stripMargin.stripLineEnd
   }
 
+  def naryNode(node: String, inputs: List[AnyRef], term: String*): String = {
+    val strInputs = inputs.mkString("\n")
+    s"""$node(${term.mkString(", ")})
+       |$strInputs
+       |""".stripMargin.stripLineEnd
+  }
+
   def values(node: String, term: String*): String = {
     s"$node(${term.mkString(", ")})"
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/22b32204/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out b/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out
index 5fbd1b5..b4d12b6 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out
@@ -4,7 +4,7 @@ LogicalUnion(all=[true])
   LogicalTableScan(table=[[_DataSetTable_1]])
 
 == Optimized Logical Plan ==
-DataSetUnion(union=[count, word])
+DataSetUnion(all=[true], union=[count, word])
   DataSetScan(table=[[_DataSetTable_0]])
   DataSetScan(table=[[_DataSetTable_1]])
 

http://git-wip-us.apache.org/repos/asf/flink/blob/22b32204/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out b/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out
index d7d343b..4822772 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out
@@ -4,7 +4,7 @@ LogicalUnion(all=[true])
   LogicalTableScan(table=[[_DataSetTable_1]])
 
 == Optimized Logical Plan ==
-DataSetUnion(union=[count, word])
+DataSetUnion(all=[true], union=[count, word])
   DataSetScan(table=[[_DataSetTable_0]])
   DataSetScan(table=[[_DataSetTable_1]])
 

http://git-wip-us.apache.org/repos/asf/flink/blob/22b32204/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out b/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out
index 2d19bdc..af6c691 100644
--- a/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out
+++ b/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out
@@ -4,7 +4,7 @@ LogicalUnion(all=[true])
   LogicalTableScan(table=[[_DataStreamTable_1]])
 
 == Optimized Logical Plan ==
-DataStreamUnion(union all=[count, word])
+DataStreamUnion(all=[true], union all=[count, word])
   DataStreamScan(table=[[_DataStreamTable_0]])
   DataStreamScan(table=[[_DataStreamTable_1]])