You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/05/28 09:21:48 UTC
[flink] branch master updated: [FLINK-12600][table-planner-blink]
Introduce various deterministic rewriting rule, which includes:
This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b333ddc [FLINK-12600][table-planner-blink] Introduce various deterministic rewriting rule, which includes:
b333ddc is described below
commit b333ddc16967ae7428229d659130b61de906ef5f
Author: godfrey he <go...@163.com>
AuthorDate: Tue May 28 17:21:34 2019 +0800
[FLINK-12600][table-planner-blink] Introduce various deterministic rewriting rule, which includes:
1. FlinkLimit0RemoveRule, that rewrites `limit 0` to empty Values
2. FlinkRewriteSubQueryRule, that rewrites a Filter with condition: `(select count from T) > 0` to a Filter with condition: `exists(select * from T)`
3. ReplaceIntersectWithSemiJoinRule, that rewrites distinct Intersect to a distinct Aggregate on a SEMI Join
4. ReplaceMinusWithAntiJoinRule, that rewrites distinct Minus to a distinct Aggregate on an ANTI Join
This closes #8520
---
.../table/plan/rules/FlinkBatchRuleSets.scala | 13 +-
.../table/plan/rules/FlinkStreamRuleSets.scala | 13 +-
.../plan/rules/logical/FlinkCalcMergeRule.scala | 2 +-
.../plan/rules/logical/FlinkLimit0RemoveRule.scala | 50 ++
.../plan/rules/logical/FlinkPruneEmptyRules.scala | 70 +++
.../rules/logical/FlinkRewriteSubQueryRule.scala | 168 ++++++
.../logical/ReplaceIntersectWithSemiJoinRule.scala | 61 ++
.../logical/ReplaceMinusWithAntiJoinRule.scala | 61 ++
.../logical/ReplaceSetOpWithJoinRuleBase.scala | 58 ++
.../flink/table/plan/batch/sql/LimitTest.xml | 23 +-
.../table/plan/batch/sql/SetOperatorsTest.xml | 224 ++++++++
.../flink/table/plan/batch/sql/SortLimitTest.xml | 20 +-
.../table/plan/batch/sql/SubplanReuseTest.xml | 39 ++
.../rules/logical/FlinkLimit0RemoveRuleTest.xml | 214 +++++++
.../rules/logical/FlinkPruneEmptyRulesTest.xml | 63 +++
.../ReplaceIntersectWithSemiJoinRuleTest.xml | 123 +++++
.../logical/ReplaceMinusWithAntiJoinRuleTest.xml | 123 +++++
.../subquery/FlinkRewriteSubQueryRuleTest.xml | 612 +++++++++++++++++++++
.../flink/table/plan/stream/sql/LimitTest.xml | 60 +-
.../table/plan/stream/sql/SetOperatorsTest.xml | 226 ++++++++
.../flink/table/plan/stream/sql/SortLimitTest.xml | 80 +--
.../table/plan/stream/sql/SubplanReuseTest.xml | 37 ++
.../flink/table/plan/batch/sql/LimitTest.scala | 1 -
.../table/plan/batch/sql/SetOperatorsTest.scala | 129 +++++
.../flink/table/plan/batch/sql/SortLimitTest.scala | 1 -
.../table/plan/batch/sql/SubplanReuseTest.scala | 3 +-
.../rules/logical/FlinkLimit0RemoveRuleTest.scala | 101 ++++
.../rules/logical/FlinkPruneEmptyRulesTest.scala | 73 +++
.../ReplaceIntersectWithSemiJoinRuleTest.scala | 84 +++
.../logical/ReplaceMinusWithAntiJoinRuleTest.scala | 82 +++
.../subquery/FlinkRewriteSubQueryRuleTest.scala | 211 +++++++
.../flink/table/plan/stream/sql/LimitTest.scala | 1 -
.../table/plan/stream/sql/SetOperatorsTest.scala | 127 +++++
.../table/plan/stream/sql/SortLimitTest.scala | 1 -
.../table/plan/stream/sql/SubplanReuseTest.scala | 3 +-
.../org/apache/flink/table/plan/util/pojos.scala | 10 +
.../runtime/batch/sql/Limit0RemoveITCase.scala | 98 ++++
.../runtime/stream/sql/Limit0RemoveITCase.scala | 187 +++++++
.../flink/table/runtime/utils/StreamTestSink.scala | 2 +-
.../flink/table/runtime/utils/TestSinkUtil.scala | 16 +-
40 files changed, 3303 insertions(+), 167 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
index d7e45b3..1401605 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
@@ -34,6 +34,7 @@ object FlinkBatchRuleSets {
val SEMI_JOIN_RULES: RuleSet = RuleSets.ofList(
SimplifyFilterConditionRule.EXTENDED,
+ FlinkRewriteSubQueryRule.FILTER,
FlinkSubQueryRemoveRule.FILTER,
JoinConditionTypeCoerceRule.INSTANCE,
FlinkJoinPushExpressionsRule.INSTANCE
@@ -116,7 +117,9 @@ object FlinkBatchRuleSets {
new CoerceInputsRule(classOf[LogicalIntersect], false),
//ensure except set operator have the same row type
new CoerceInputsRule(classOf[LogicalMinus], false),
- ConvertToNotInOrInRule.INSTANCE
+ ConvertToNotInOrInRule.INSTANCE,
+ // optimize limit 0
+ FlinkLimit0RemoveRule.INSTANCE
)).asJava)
/**
@@ -159,7 +162,7 @@ object FlinkBatchRuleSets {
PruneEmptyRules.AGGREGATE_INSTANCE,
PruneEmptyRules.FILTER_INSTANCE,
PruneEmptyRules.JOIN_LEFT_INSTANCE,
- PruneEmptyRules.JOIN_RIGHT_INSTANCE,
+ FlinkPruneEmptyRules.JOIN_RIGHT_INSTANCE,
PruneEmptyRules.PROJECT_INSTANCE,
PruneEmptyRules.SORT_INSTANCE,
PruneEmptyRules.UNION_INSTANCE
@@ -260,7 +263,11 @@ object FlinkBatchRuleSets {
// semi/anti join transpose rule
FlinkSemiAntiJoinJoinTransposeRule.INSTANCE,
FlinkSemiAntiJoinProjectTransposeRule.INSTANCE,
- FlinkSemiAntiJoinFilterTransposeRule.INSTANCE
+ FlinkSemiAntiJoinFilterTransposeRule.INSTANCE,
+
+ // set operators
+ ReplaceIntersectWithSemiJoinRule.INSTANCE,
+ ReplaceMinusWithAntiJoinRule.INSTANCE
)
/**
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
index ce99a98..419306e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
@@ -34,6 +34,7 @@ object FlinkStreamRuleSets {
val SEMI_JOIN_RULES: RuleSet = RuleSets.ofList(
SimplifyFilterConditionRule.EXTENDED,
+ FlinkRewriteSubQueryRule.FILTER,
FlinkSubQueryRemoveRule.FILTER,
JoinConditionTypeCoerceRule.INSTANCE,
FlinkJoinPushExpressionsRule.INSTANCE
@@ -118,7 +119,9 @@ object FlinkStreamRuleSets {
new CoerceInputsRule(classOf[LogicalIntersect], false),
//ensure except set operator have the same row type
new CoerceInputsRule(classOf[LogicalMinus], false),
- ConvertToNotInOrInRule.INSTANCE
+ ConvertToNotInOrInRule.INSTANCE,
+ // optimize limit 0
+ FlinkLimit0RemoveRule.INSTANCE
)
).asJava)
@@ -157,7 +160,7 @@ object FlinkStreamRuleSets {
PruneEmptyRules.AGGREGATE_INSTANCE,
PruneEmptyRules.FILTER_INSTANCE,
PruneEmptyRules.JOIN_LEFT_INSTANCE,
- PruneEmptyRules.JOIN_RIGHT_INSTANCE,
+ FlinkPruneEmptyRules.JOIN_RIGHT_INSTANCE,
PruneEmptyRules.PROJECT_INSTANCE,
PruneEmptyRules.SORT_INSTANCE,
PruneEmptyRules.UNION_INSTANCE
@@ -232,7 +235,11 @@ object FlinkStreamRuleSets {
// semi/anti join transpose rule
FlinkSemiAntiJoinJoinTransposeRule.INSTANCE,
FlinkSemiAntiJoinProjectTransposeRule.INSTANCE,
- FlinkSemiAntiJoinFilterTransposeRule.INSTANCE
+ FlinkSemiAntiJoinFilterTransposeRule.INSTANCE,
+
+ // set operators
+ ReplaceIntersectWithSemiJoinRule.INSTANCE,
+ ReplaceMinusWithAntiJoinRule.INSTANCE
)
/**
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/FlinkCalcMergeRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/FlinkCalcMergeRule.scala
index 8deddfb..550c318 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/FlinkCalcMergeRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/FlinkCalcMergeRule.scala
@@ -29,7 +29,7 @@ import org.apache.calcite.tools.RelBuilderFactory
import scala.collection.JavaConversions._
/**
- * This rules is copied from Calcite's [[org.apache.calcite.rel.rules.CalcMergeRule]].
+ * This rule is copied from Calcite's [[org.apache.calcite.rel.rules.CalcMergeRule]].
*
* Modification:
* - Condition in the merged program will be simplified if it exists.
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/FlinkLimit0RemoveRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/FlinkLimit0RemoveRule.scala
new file mode 100644
index 0000000..73190fa
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/FlinkLimit0RemoveRule.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.core.Sort
+import org.apache.calcite.rex.RexLiteral
+
+/**
+ * Planner rule that rewrites `limit 0` to empty [[org.apache.calcite.rel.core.Values]].
+ */
+class FlinkLimit0RemoveRule extends RelOptRule(
+ operand(classOf[Sort], any()),
+ "FlinkLimit0RemoveRule") {
+
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val sort: Sort = call.rel(0)
+ sort.fetch != null && RexLiteral.intValue(sort.fetch) == 0
+ }
+
+ override def onMatch(call: RelOptRuleCall): Unit = {
+ val sort: Sort = call.rel(0)
+ val emptyValues = call.builder().values(sort.getRowType).build()
+ call.transformTo(emptyValues)
+
+ // New plan is absolutely better than old plan.
+ call.getPlanner.setImportance(sort, 0.0)
+ }
+}
+
+object FlinkLimit0RemoveRule {
+ val INSTANCE = new FlinkLimit0RemoveRule
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/FlinkPruneEmptyRules.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/FlinkPruneEmptyRules.scala
new file mode 100644
index 0000000..f85087b
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/FlinkPruneEmptyRules.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{any, none, operand, some}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.{Join, JoinRelType, Values}
+
+object FlinkPruneEmptyRules {
+
+ /**
+ * This rule is copied from Calcite's
+ * [[org.apache.calcite.rel.rules.PruneEmptyRules#JOIN_RIGHT_INSTANCE]].
+ *
+ * Modification:
+ * - Handles ANTI join specially.
+ *
+ * Rule that converts a [[Join]] to empty if its right child is empty.
+ *
+ * <p>Examples:
+ *
+ * <ul>
+ * <li>Join(Scan(Emp), Empty, INNER) becomes Empty
+ * </ul>
+ */
+ val JOIN_RIGHT_INSTANCE: RelOptRule = new RelOptRule(
+ operand(classOf[Join],
+ some(operand(classOf[RelNode], any),
+ operand(classOf[Values], none))),
+ "FlinkPruneEmptyRules(right)") {
+
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val right: Values = call.rel(2)
+ Values.IS_EMPTY.apply(right)
+ }
+
+ override def onMatch(call: RelOptRuleCall): Unit = {
+ val join: Join = call.rel(0)
+ join.getJoinType match {
+ case JoinRelType.ANTI =>
+ // "select * from emp where deptno not in (select deptno from dept where 1=0)"
+ // return emp
+ call.transformTo(call.builder().push(join.getLeft).build)
+ case _ =>
+ if (join.getJoinType.generatesNullsOnRight) {
+ // "select * from emp left join dept" is not necessarily empty if dept is empty
+ } else {
+ call.transformTo(call.builder.push(join).empty.build)
+ }
+ }
+ }
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/FlinkRewriteSubQueryRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/FlinkRewriteSubQueryRule.scala
new file mode 100644
index 0000000..8d97e34
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/FlinkRewriteSubQueryRule.scala
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{any, operandJ}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.{Aggregate, Filter, RelFactories}
+import org.apache.calcite.rex.{RexShuttle, _}
+import org.apache.calcite.sql.SqlKind
+import org.apache.calcite.sql.`type`.SqlTypeFamily
+import org.apache.calcite.sql.fun.SqlCountAggFunction
+import org.apache.calcite.tools.RelBuilderFactory
+
+import scala.collection.JavaConversions._
+
+/**
+ * Planner rule that rewrites scalar query in filter like:
+ * `select * from T1 where (select count(*) from T2) > 0`
+ * to
+ * `select * from T1 where exists (select * from T2)`,
+ * which could be converted to SEMI join by [[FlinkSubQueryRemoveRule]].
+ *
+ * Without this rule, the original query will be rewritten to a filter on a join on an aggregate
+ * by [[org.apache.calcite.rel.rules.SubQueryRemoveRule]]. the full logical plan is
+ * {{{
+ * LogicalProject(a=[$0], b=[$1], c=[$2])
+ * +- LogicalJoin(condition=[$3], joinType=[semi])
+ * :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+ * +- LogicalProject($f0=[IS NOT NULL($0)])
+ * +- LogicalAggregate(group=[{}], m=[MIN($0)])
+ * +- LogicalProject(i=[true])
+ * +- LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+ * }}}
+ */
+class FlinkRewriteSubQueryRule(
+ operand: RelOptRuleOperand,
+ relBuilderFactory: RelBuilderFactory,
+ description: String)
+ extends RelOptRule(operand, relBuilderFactory, description) {
+
+ override def onMatch(call: RelOptRuleCall): Unit = {
+ val filter: Filter = call.rel(0)
+ val condition = filter.getCondition
+ val newCondition = rewriteScalarQuery(condition)
+ if (RexUtil.eq(condition, newCondition)) {
+ return
+ }
+
+ val newFilter = filter.copy(filter.getTraitSet, filter.getInput, newCondition)
+ call.transformTo(newFilter)
+ }
+
+ // scalar query like: `(select count(*) from T) > 0` can be converted to `exists(select * from T)`
+ def rewriteScalarQuery(condition: RexNode): RexNode = {
+ condition.accept(new RexShuttle() {
+ override def visitCall(call: RexCall): RexNode = {
+ val subQuery = getSupportedScalarQuery(call)
+ subQuery match {
+ case Some(sq) =>
+ val aggInput = sq.rel.getInput(0)
+ RexSubQuery.exists(aggInput)
+ case _ => super.visitCall(call)
+ }
+ }
+ })
+ }
+
+ private def isScalarQuery(n: RexNode): Boolean = n.isA(SqlKind.SCALAR_QUERY)
+
+ private def getSupportedScalarQuery(call: RexCall): Option[RexSubQuery] = {
+ // check the RexNode is a RexLiteral which's value is between 0 and 1
+ def isBetween0And1(n: RexNode, include0: Boolean, include1: Boolean): Boolean = {
+ n match {
+ case l: RexLiteral =>
+ l.getTypeName.getFamily match {
+ case SqlTypeFamily.NUMERIC if l.getValue != null =>
+ val v = l.getValue.toString.toDouble
+ (0.0 < v && v < 1.0) || (include0 && v == 0.0) || (include1 && v == 1.0)
+ case _ => false
+ }
+ case _ => false
+ }
+ }
+
+ // check the RelNode is a Aggregate which has only count aggregate call with empty args
+ def isCountStarAggWithoutGroupBy(n: RelNode): Boolean = {
+ n match {
+ case agg: Aggregate =>
+ if (agg.getGroupCount == 0 && agg.getAggCallList.size() == 1) {
+ val aggCall = agg.getAggCallList.head
+ !aggCall.isDistinct &&
+ aggCall.filterArg < 0 &&
+ aggCall.getArgList.isEmpty &&
+ aggCall.getAggregation.isInstanceOf[SqlCountAggFunction]
+ } else {
+ false
+ }
+ case _ => false
+ }
+ }
+
+ call.getKind match {
+ // (select count(*) from T) > X (X is between 0 (inclusive) and 1 (exclusive))
+ case SqlKind.GREATER_THAN if isScalarQuery(call.operands.head) =>
+ val subQuery = call.operands.head.asInstanceOf[RexSubQuery]
+ if (isCountStarAggWithoutGroupBy(subQuery.rel) &&
+ isBetween0And1(call.operands.last, include0 = true, include1 = false)) {
+ Some(subQuery)
+ } else {
+ None
+ }
+ // (select count(*) from T) >= X (X is between 0 (exclusive) and 1 (inclusive))
+ case SqlKind.GREATER_THAN_OR_EQUAL if isScalarQuery(call.operands.head) =>
+ val subQuery = call.operands.head.asInstanceOf[RexSubQuery]
+ if (isCountStarAggWithoutGroupBy(subQuery.rel) &&
+ isBetween0And1(call.operands.last, include0 = false, include1 = true)) {
+ Some(subQuery)
+ } else {
+ None
+ }
+ // X < (select count(*) from T) (X is between 0 (inclusive) and 1 (exclusive))
+ case SqlKind.LESS_THAN if isScalarQuery(call.operands.last) =>
+ val subQuery = call.operands.last.asInstanceOf[RexSubQuery]
+ if (isCountStarAggWithoutGroupBy(subQuery.rel) &&
+ isBetween0And1(call.operands.head, include0 = true, include1 = false)) {
+ Some(subQuery)
+ } else {
+ None
+ }
+ // X <= (select count(*) from T) (X is between 0 (exclusive) and 1 (inclusive))
+ case SqlKind.LESS_THAN_OR_EQUAL if isScalarQuery(call.operands.last) =>
+ val subQuery = call.operands.last.asInstanceOf[RexSubQuery]
+ if (isCountStarAggWithoutGroupBy(subQuery.rel) &&
+ isBetween0And1(call.operands.head, include0 = false, include1 = true)) {
+ Some(subQuery)
+ } else {
+ None
+ }
+ case _ => None
+ }
+ }
+}
+
+object FlinkRewriteSubQueryRule {
+
+ val FILTER = new FlinkRewriteSubQueryRule(
+ operandJ(classOf[Filter], null, RexUtil.SubQueryFinder.FILTER_PREDICATE, any),
+ RelFactories.LOGICAL_BUILDER,
+ "FlinkRewriteSubQueryRule:Filter")
+
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/ReplaceIntersectWithSemiJoinRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/ReplaceIntersectWithSemiJoinRule.scala
new file mode 100644
index 0000000..6ac7dea
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/ReplaceIntersectWithSemiJoinRule.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.core.{Aggregate, Intersect, Join, JoinRelType}
+
+import scala.collection.JavaConversions._
+
+/**
+ * Planner rule that replaces distinct [[Intersect]] with
+ * a distinct [[Aggregate]] on a SEMI [[Join]].
+ *
+ * <p>Note: Not support Intersect All.
+ */
+class ReplaceIntersectWithSemiJoinRule extends ReplaceSetOpWithJoinRuleBase(
+ classOf[Intersect],
+ "ReplaceIntersectWithSemiJoinRule") {
+
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val intersect: Intersect = call.rel(0)
+ // not support intersect all now.
+ intersect.isDistinct
+ }
+
+ override def onMatch(call: RelOptRuleCall): Unit = {
+ val intersect: Intersect = call.rel(0)
+ val left = intersect.getInput(0)
+ val right = intersect.getInput(1)
+
+ val relBuilder = call.builder
+ val keys = 0 until left.getRowType.getFieldCount
+ val conditions = generateCondition(relBuilder, left, right, keys)
+
+ relBuilder.push(left)
+ relBuilder.push(right)
+ relBuilder.join(JoinRelType.SEMI, conditions).aggregate(relBuilder.groupKey(keys: _*))
+ val rel = relBuilder.build()
+ call.transformTo(rel)
+ }
+}
+
+object ReplaceIntersectWithSemiJoinRule {
+ val INSTANCE: RelOptRule = new ReplaceIntersectWithSemiJoinRule
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/ReplaceMinusWithAntiJoinRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/ReplaceMinusWithAntiJoinRule.scala
new file mode 100644
index 0000000..c322cf9
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/ReplaceMinusWithAntiJoinRule.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.core._
+
+import scala.collection.JavaConversions._
+
+/**
+ * Planner rule that replaces distinct [[Minus]] (SQL keyword: EXCEPT) with
+ * a distinct [[Aggregate]] on an ANTI [[Join]].
+ *
+ * <p>Note: Not support Minus All.
+ */
+class ReplaceMinusWithAntiJoinRule extends ReplaceSetOpWithJoinRuleBase(
+ classOf[Minus],
+ "ReplaceMinusWithAntiJoinRule") {
+
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val minus: Minus = call.rel(0)
+ // not support minus all now.
+ minus.isDistinct
+ }
+
+ override def onMatch(call: RelOptRuleCall): Unit = {
+ val minus: Minus = call.rel(0)
+ val left = minus.getInput(0)
+ val right = minus.getInput(1)
+
+ val relBuilder = call.builder
+ val keys = 0 until left.getRowType.getFieldCount
+ val conditions = generateCondition(relBuilder, left, right, keys)
+
+ relBuilder.push(left)
+ relBuilder.push(right)
+ relBuilder.join(JoinRelType.ANTI, conditions).aggregate(relBuilder.groupKey(keys: _*))
+ val rel = relBuilder.build()
+ call.transformTo(rel)
+ }
+}
+
+object ReplaceMinusWithAntiJoinRule {
+ val INSTANCE: RelOptRule = new ReplaceMinusWithAntiJoinRule
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/ReplaceSetOpWithJoinRuleBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/ReplaceSetOpWithJoinRuleBase.scala
new file mode 100644
index 0000000..1f400a0
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/ReplaceSetOpWithJoinRuleBase.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptUtil}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.{RelFactories, SetOp}
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
+
+/**
+ * Base class that replace [[SetOp]] to [[org.apache.calcite.rel.core.Join]].
+ */
+abstract class ReplaceSetOpWithJoinRuleBase[T <: SetOp](
+ clazz: Class[T],
+ description: String)
+ extends RelOptRule(
+ operand(clazz, any),
+ RelFactories.LOGICAL_BUILDER,
+ description) {
+
+ protected def generateCondition(
+ relBuilder: RelBuilder,
+ left: RelNode,
+ right: RelNode,
+ keys: Seq[Int]): Seq[RexNode] = {
+ val rexBuilder = relBuilder.getRexBuilder
+ val leftTypes = RelOptUtil.getFieldTypeList(left.getRowType)
+ val rightTypes = RelOptUtil.getFieldTypeList(right.getRowType)
+ val conditions = keys.map { key =>
+ val leftRex = rexBuilder.makeInputRef(leftTypes.get(key), key)
+ val rightRex = rexBuilder.makeInputRef(rightTypes.get(key), leftTypes.size + key)
+ val equalCond = rexBuilder.makeCall(SqlStdOperatorTable.EQUALS, leftRex, rightRex)
+ relBuilder.or(
+ equalCond,
+ relBuilder.and(relBuilder.isNull(leftRex), relBuilder.isNull(rightRex)))
+ }
+ conditions
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/LimitTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/LimitTest.xml
index 731f1b5..54f0292 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/LimitTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/LimitTest.xml
@@ -29,11 +29,7 @@ LogicalSort(fetch=[0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a, c])
-+- Limit(offset=[0], fetch=[0], global=[true])
- +- Exchange(distribution=[single])
- +- Limit(offset=[0], fetch=[0], global=[false])
- +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+Values(tuples=[[]], values=[a, c])
]]>
</Resource>
</TestCase>
@@ -92,11 +88,7 @@ LogicalSort(offset=[10], fetch=[0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a, c])
-+- Limit(offset=[10], fetch=[0], global=[true])
- +- Exchange(distribution=[single])
- +- Limit(offset=[0], fetch=[10], global=[false])
- +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+Values(tuples=[[]], values=[a, c])
]]>
</Resource>
</TestCase>
@@ -134,11 +126,7 @@ LogicalSort(offset=[0], fetch=[0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a, c])
-+- Limit(offset=[0], fetch=[0], global=[true])
- +- Exchange(distribution=[single])
- +- Limit(offset=[0], fetch=[0], global=[false])
- +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+Values(tuples=[[]], values=[a, c])
]]>
</Resource>
</TestCase>
@@ -155,10 +143,7 @@ LogicalSort(fetch=[0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Limit(offset=[0], fetch=[0], global=[true])
-+- Exchange(distribution=[single])
- +- Limit(offset=[0], fetch=[0], global=[false])
- +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+Values(tuples=[[]], values=[a, b, c])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SetOperatorsTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SetOperatorsTest.xml
new file mode 100644
index 0000000..a3fada8
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SetOperatorsTest.xml
@@ -0,0 +1,224 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testIntersect">
+ <Resource name="sql">
+ <![CDATA[SELECT c FROM T1 INTERSECT SELECT f FROM T2]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalIntersect(all=[false])
+:- LogicalProject(c=[$2])
+: +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(f=[$2])
+ +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+HashAggregate(isMerge=[false], groupBy=[c], select=[c])
++- HashJoin(joinType=[LeftSemiJoin], where=[OR(=(c, f), AND(IS NULL(c), IS NULL(f)))], select=[c], build=[left])
+ :- Exchange(distribution=[hash[c]])
+ : +- Calc(select=[c])
+ : +- TableSourceScan(table=[[T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- Exchange(distribution=[hash[f]])
+ +- Calc(select=[f])
+ +- TableSourceScan(table=[[T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testIntersectLeftIsEmpty">
+ <Resource name="sql">
+ <![CDATA[SELECT c FROM T1 WHERE 1=0 INTERSECT SELECT f FROM T2]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalIntersect(all=[false])
+:- LogicalProject(c=[$2])
+: +- LogicalFilter(condition=[=(1, 0)])
+: +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(f=[$2])
+ +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Values(tuples=[[]], values=[c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testIntersectRightIsEmpty">
+ <Resource name="sql">
+ <![CDATA[SELECT c FROM T1 INTERSECT SELECT f FROM T2 WHERE 1=0]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalIntersect(all=[false])
+:- LogicalProject(c=[$2])
+: +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(f=[$2])
+ +- LogicalFilter(condition=[=(1, 0)])
+ +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Values(tuples=[[]], values=[c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMinus">
+ <Resource name="sql">
+ <![CDATA[SELECT c FROM T1 EXCEPT SELECT f FROM T2]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalMinus(all=[false])
+:- LogicalProject(c=[$2])
+: +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(f=[$2])
+ +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+HashAggregate(isMerge=[false], groupBy=[c], select=[c])
++- HashJoin(joinType=[LeftAntiJoin], where=[OR(=(c, f), AND(IS NULL(c), IS NULL(f)))], select=[c], build=[left])
+ :- Exchange(distribution=[hash[c]])
+ : +- Calc(select=[c])
+ : +- TableSourceScan(table=[[T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- Exchange(distribution=[hash[f]])
+ +- Calc(select=[f])
+ +- TableSourceScan(table=[[T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMinusLeftIsEmpty">
+ <Resource name="sql">
+ <![CDATA[SELECT c FROM T1 WHERE 1=0 EXCEPT SELECT f FROM T2]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalMinus(all=[false])
+:- LogicalProject(c=[$2])
+: +- LogicalFilter(condition=[=(1, 0)])
+: +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(f=[$2])
+ +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Values(tuples=[[]], values=[c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMinusRightIsEmpty">
+ <Resource name="sql">
+ <![CDATA[SELECT c FROM T1 EXCEPT SELECT f FROM T2 WHERE 1=0]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalMinus(all=[false])
+:- LogicalProject(c=[$2])
+: +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(f=[$2])
+ +- LogicalFilter(condition=[=(1, 0)])
+ +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+HashAggregate(isMerge=[true], groupBy=[c], select=[c])
++- Exchange(distribution=[hash[c]])
+ +- LocalHashAggregate(groupBy=[c], select=[c])
+ +- Calc(select=[c])
+ +- TableSourceScan(table=[[T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMinusWithNestedTypes">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable EXCEPT SELECT * FROM MyTable]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalMinus(all=[false])
+:- LogicalProject(a=[$0], b=[$1], c=[$2])
+: +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(a=[$0], b=[$1], c=[$2])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+HashAggregate(isMerge=[false], groupBy=[a, b, c], select=[a, b, c])
++- HashJoin(joinType=[LeftAntiJoin], where=[AND(OR(=(a, a0), AND(IS NULL(a), IS NULL(a0))), OR(=(b, b0), AND(IS NULL(b), IS NULL(b0))), OR(=(c, c0), AND(IS NULL(c), IS NULL(c0))))], select=[a, b, c], build=[left])
+ :- Exchange(distribution=[hash[a, b, c]], exchange_mode=[BATCH], reuse_id=[1])
+ : +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- Reused(reference_id=[1])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testUnionNullableTypes">
+ <Resource name="sql">
+ <![CDATA[SELECT a FROM A UNION ALL SELECT CASE WHEN c > 0 THEN b ELSE NULL END FROM A]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalUnion(all=[true])
+:- LogicalProject(a=[$0])
+: +- LogicalTableScan(table=[[A, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(EXPR$0=[CASE(>($2, 0), $1, null:RecordType:peek_no_expand(INTEGER _1, VARCHAR(65536) CHARACTER SET "UTF-16LE" _2))])
+ +- LogicalTableScan(table=[[A, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Union(all=[true], union=[a])
+:- Calc(select=[a])
+: +- TableSourceScan(table=[[A, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], reuse_id=[1])
++- Calc(select=[CASE(>(c, 0), b, null:RecordType:peek_no_expand(INTEGER _1, VARCHAR(65536) CHARACTER SET "UTF-16LE" _2)) AS EXPR$0])
+ +- Reused(reference_id=[1])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testUnionAnyType">
+ <Resource name="sql">
+ <![CDATA[SELECT a FROM A UNION ALL SELECT b FROM A]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalUnion(all=[true])
+:- LogicalProject(a=[$0])
+: +- LogicalTableScan(table=[[A, source: [TestTableSource(a, b)]]])
++- LogicalProject(b=[$1])
+ +- LogicalTableScan(table=[[A, source: [TestTableSource(a, b)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Union(all=[true], union=[a])
+:- Calc(select=[a])
+: +- TableSourceScan(table=[[A, source: [TestTableSource(a, b)]]], fields=[a, b], reuse_id=[1])
++- Calc(select=[b])
+ +- Reused(reference_id=[1])
+]]>
+ </Resource>
+ </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SortLimitTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SortLimitTest.xml
index e4f1be9..94c59b3 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SortLimitTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SortLimitTest.xml
@@ -49,10 +49,7 @@ LogicalSort(sort0=[$0], sort1=[$1], dir0=[DESC-nulls-last], dir1=[ASC-nulls-firs
</Resource>
<Resource name="planAfter">
<![CDATA[
-SortLimit(orderBy=[a DESC, b ASC], offset=[1], fetch=[0], global=[true])
-+- Exchange(distribution=[single])
- +- SortLimit(orderBy=[a DESC, b ASC], offset=[0], fetch=[1], global=[false])
- +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+Values(tuples=[[]], values=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -88,10 +85,7 @@ LogicalSort(sort0=[$0], dir0=[DESC-nulls-last], fetch=[0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-SortLimit(orderBy=[a DESC], offset=[0], fetch=[0], global=[true])
-+- Exchange(distribution=[single])
- +- SortLimit(orderBy=[a DESC], offset=[0], fetch=[0], global=[false])
- +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+Values(tuples=[[]], values=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -147,10 +141,7 @@ LogicalSort(sort0=[$0], dir0=[DESC-nulls-last], fetch=[0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-SortLimit(orderBy=[a DESC], offset=[0], fetch=[0], global=[true])
-+- Exchange(distribution=[single])
- +- SortLimit(orderBy=[a DESC], offset=[0], fetch=[0], global=[false])
- +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+Values(tuples=[[]], values=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -207,10 +198,7 @@ LogicalSort(sort0=[$0], sort1=[$1], dir0=[DESC-nulls-last], dir1=[ASC-nulls-firs
</Resource>
<Resource name="planAfter">
<![CDATA[
-SortLimit(orderBy=[a DESC, b ASC], offset=[1], fetch=[0], global=[true])
-+- Exchange(distribution=[single])
- +- SortLimit(orderBy=[a DESC, b ASC], offset=[0], fetch=[1], global=[false])
- +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+Values(tuples=[[]], values=[a, b, c])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.xml
index a1cc277..a1cbb1a 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.xml
@@ -1069,4 +1069,43 @@ SortMergeJoin(joinType=[InnerJoin], where=[=(a, d0)], select=[a, b, c, d, e, f,
]]>
</Resource>
</TestCase>
+ <TestCase name="testSubplanReuseWithDynamicFunction">
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalIntersect(all=[false])
+:- LogicalIntersect(all=[false])
+: :- LogicalProject(random=[$0])
+: : +- LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[1])
+: : +- LogicalProject(random=[$0], EXPR$1=[RAND()])
+: : +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+: +- LogicalProject(random=[$0])
+: +- LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[1])
+: +- LogicalProject(random=[$0], EXPR$1=[RAND()])
+: +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(random=[$0])
+ +- LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[1])
+ +- LogicalProject(random=[$0], EXPR$1=[RAND()])
+ +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+NestedLoopJoin(joinType=[LeftSemiJoin], where=[OR(=(random, random0), AND(IS NULL(random), IS NULL(random0)))], select=[random], build=[right])
+:- SortAggregate(isMerge=[false], groupBy=[random], select=[random])
+: +- Sort(orderBy=[random ASC])
+: +- Exchange(distribution=[hash[random]])
+: +- NestedLoopJoin(joinType=[LeftSemiJoin], where=[OR(=(random, random0), AND(IS NULL(random), IS NULL(random0)))], select=[random], build=[right])
+: :- Exchange(distribution=[any], exchange_mode=[BATCH])
+: : +- Calc(select=[random], reuse_id=[1])
+: : +- SortLimit(orderBy=[EXPR$1 ASC], offset=[0], fetch=[1], global=[true])
+: : +- Exchange(distribution=[single])
+: : +- SortLimit(orderBy=[EXPR$1 ASC], offset=[0], fetch=[1], global=[false])
+: : +- Calc(select=[a AS random, RAND() AS EXPR$1])
+: : +- TableSourceScan(table=[[x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+: +- Exchange(distribution=[broadcast], reuse_id=[2])
+: +- Reused(reference_id=[1])
++- Reused(reference_id=[2])
+]]>
+ </Resource>
+ </TestCase>
</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/FlinkLimit0RemoveRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/FlinkLimit0RemoveRuleTest.xml
new file mode 100644
index 0000000..2a2949e
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/FlinkLimit0RemoveRuleTest.xml
@@ -0,0 +1,214 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testLimitZeroWithExists">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable WHERE EXISTS (SELECT a FROM MyTable LIMIT 0)]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[EXISTS({
+LogicalSort(fetch=[0])
+ LogicalProject(a=[$0])
+ LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+})])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[$3], joinType=[semi])
+ :- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+ +- LogicalProject($f0=[IS NOT NULL($0)])
+ +- LogicalAggregate(group=[{}], m=[MIN($0)])
+ +- LogicalProject(i=[true])
+ +- LogicalValues(tuples=[[]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLimitZeroWithNotIn">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable WHERE a NOT IN (SELECT a FROM MyTable LIMIT 0)]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[NOT(IN($0, {
+LogicalSort(fetch=[0])
+ LogicalProject(a=[$0])
+ LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+}))])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[OR(=($0, $3), IS NULL($0), IS NULL($3))], joinType=[anti])
+ :- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+ +- LogicalValues(tuples=[[]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLimitZeroWithIn">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable WHERE a IN (SELECT a FROM MyTable LIMIT 0)]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[IN($0, {
+LogicalSort(fetch=[0])
+ LogicalProject(a=[$0])
+ LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+})])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[=($0, $3)], joinType=[semi])
+ :- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+ +- LogicalValues(tuples=[[]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLimitZeroWithJoin">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable INNER JOIN (SELECT * FROM MyTable Limit 0) ON TRUE]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], a0=[$3], b0=[$4], c0=[$5])
++- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+ +- LogicalSort(fetch=[0])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], a0=[$3], b0=[$4], c0=[$5])
++- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+ +- LogicalValues(tuples=[[]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLimitZeroWithNotExists">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable WHERE NOT EXISTS (SELECT a FROM MyTable LIMIT 0)]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[NOT(EXISTS({
+LogicalSort(fetch=[0])
+ LogicalProject(a=[$0])
+ LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+}))])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[$3], joinType=[anti])
+ :- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+ +- LogicalProject($f0=[IS NOT NULL($0)])
+ +- LogicalAggregate(group=[{}], m=[MIN($0)])
+ +- LogicalProject(i=[true])
+ +- LogicalValues(tuples=[[]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSimpleLimitZero">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable LIMIT 0]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalSort(fetch=[0])
++- LogicalProject(a=[$0], b=[$1], c=[$2])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalValues(tuples=[[]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLimitZeroWithOffset">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable ORDER BY a LIMIT 0 OFFSET 10]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalSort(sort0=[$0], dir0=[ASC-nulls-first], offset=[10], fetch=[0])
++- LogicalProject(a=[$0], b=[$1], c=[$2])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalValues(tuples=[[]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLimitZeroWithSelect">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM (SELECT a FROM MyTable LIMIT 0)]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0])
++- LogicalSort(fetch=[0])
+ +- LogicalProject(a=[$0])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0])
++- LogicalValues(tuples=[[]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLimitZeroWithOrderBy">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable ORDER BY a LIMIT 0]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalSort(sort0=[$0], dir0=[ASC-nulls-first], fetch=[0])
++- LogicalProject(a=[$0], b=[$1], c=[$2])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalValues(tuples=[[]])
+]]>
+ </Resource>
+ </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/FlinkPruneEmptyRulesTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/FlinkPruneEmptyRulesTest.xml
new file mode 100644
index 0000000..ddc7d13
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/FlinkPruneEmptyRulesTest.xml
@@ -0,0 +1,63 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testSemiJoinRightIsEmpty">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM T1 WHERE a IN (SELECT d FROM T2 WHERE 1=0)]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(d=[$0])
+ LogicalFilter(condition=[=(1, 0)])
+ LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+})])
+ +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalValues(tuples=[[]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testAntiJoinRightIsEmpty">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM T1 WHERE a NOT IN (SELECT d FROM T2 WHERE 1=0)]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[NOT(IN($0, {
+LogicalProject(d=[$0])
+ LogicalFilter(condition=[=(1, 0)])
+ LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+}))])
+ +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/ReplaceIntersectWithSemiJoinRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/ReplaceIntersectWithSemiJoinRuleTest.xml
new file mode 100644
index 0000000..0d5a4ec
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/ReplaceIntersectWithSemiJoinRuleTest.xml
@@ -0,0 +1,123 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testIntersect">
+ <Resource name="sql">
+ <![CDATA[SELECT c FROM T1 INTERSECT SELECT f FROM T2]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalIntersect(all=[false])
+:- LogicalProject(c=[$2])
+: +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(f=[$2])
+ +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalAggregate(group=[{0}])
++- LogicalJoin(condition=[OR(=($0, $1), AND(IS NULL($0), IS NULL($1)))], joinType=[semi])
+ :- LogicalProject(c=[$2])
+ : +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalProject(f=[$2])
+ +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testIntersectLeftIsEmpty">
+ <Resource name="sql">
+ <![CDATA[SELECT c FROM T1 WHERE 1=0 INTERSECT SELECT f FROM T2]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalIntersect(all=[false])
+:- LogicalProject(c=[$2])
+: +- LogicalFilter(condition=[=(1, 0)])
+: +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(f=[$2])
+ +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalAggregate(group=[{0}])
++- LogicalJoin(condition=[OR(=($0, $1), AND(IS NULL($0), IS NULL($1)))], joinType=[semi])
+ :- LogicalProject(c=[$2])
+ : +- LogicalFilter(condition=[=(1, 0)])
+ : +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalProject(f=[$2])
+ +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testIntersectRightIsEmpty">
+ <Resource name="sql">
+ <![CDATA[SELECT c FROM T1 INTERSECT SELECT f FROM T2 WHERE 1=0]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalIntersect(all=[false])
+:- LogicalProject(c=[$2])
+: +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(f=[$2])
+ +- LogicalFilter(condition=[=(1, 0)])
+ +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalAggregate(group=[{0}])
++- LogicalJoin(condition=[OR(=($0, $1), AND(IS NULL($0), IS NULL($1)))], joinType=[semi])
+ :- LogicalProject(c=[$2])
+ : +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalProject(f=[$2])
+ +- LogicalFilter(condition=[=(1, 0)])
+ +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testIntersectWithFilter">
+ <Resource name="sql">
+ <![CDATA[SELECT c FROM ((SELECT * FROM T1) INTERSECT (SELECT * FROM T2)) WHERE a > 1]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(c=[$2])
++- LogicalFilter(condition=[>($0, 1)])
+ +- LogicalIntersect(all=[false])
+ :- LogicalProject(a=[$0], b=[$1], c=[$2])
+ : +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalProject(d=[$0], e=[$1], f=[$2])
+ +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(c=[$2])
++- LogicalFilter(condition=[>($0, 1)])
+ +- LogicalAggregate(group=[{0, 1, 2}])
+ +- LogicalJoin(condition=[AND(OR(=($0, $3), AND(IS NULL($0), IS NULL($3))), OR(=($1, $4), AND(IS NULL($1), IS NULL($4))), OR(=($2, $5), AND(IS NULL($2), IS NULL($5))))], joinType=[semi])
+ :- LogicalProject(a=[$0], b=[$1], c=[$2])
+ : +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalProject(d=[$0], e=[$1], f=[$2])
+ +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/ReplaceMinusWithAntiJoinRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/ReplaceMinusWithAntiJoinRuleTest.xml
new file mode 100644
index 0000000..02494da
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/ReplaceMinusWithAntiJoinRuleTest.xml
@@ -0,0 +1,123 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testExcept">
+ <Resource name="sql">
+ <![CDATA[SELECT c FROM T1 EXCEPT SELECT f FROM T2]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalMinus(all=[false])
+:- LogicalProject(c=[$2])
+: +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(f=[$2])
+ +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalAggregate(group=[{0}])
++- LogicalJoin(condition=[OR(=($0, $1), AND(IS NULL($0), IS NULL($1)))], joinType=[anti])
+ :- LogicalProject(c=[$2])
+ : +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalProject(f=[$2])
+ +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testExceptLeftIsEmpty">
+ <Resource name="sql">
+ <![CDATA[SELECT c FROM T1 WHERE 1=0 EXCEPT SELECT f FROM T2]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalMinus(all=[false])
+:- LogicalProject(c=[$2])
+: +- LogicalFilter(condition=[=(1, 0)])
+: +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(f=[$2])
+ +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalAggregate(group=[{0}])
++- LogicalJoin(condition=[OR(=($0, $1), AND(IS NULL($0), IS NULL($1)))], joinType=[anti])
+ :- LogicalProject(c=[$2])
+ : +- LogicalFilter(condition=[=(1, 0)])
+ : +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalProject(f=[$2])
+ +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testExceptRightIsEmpty">
+ <Resource name="sql">
+ <![CDATA[SELECT c FROM T1 EXCEPT SELECT f FROM T2 WHERE 1=0]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalMinus(all=[false])
+:- LogicalProject(c=[$2])
+: +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(f=[$2])
+ +- LogicalFilter(condition=[=(1, 0)])
+ +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalAggregate(group=[{0}])
++- LogicalJoin(condition=[OR(=($0, $1), AND(IS NULL($0), IS NULL($1)))], joinType=[anti])
+ :- LogicalProject(c=[$2])
+ : +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalProject(f=[$2])
+ +- LogicalFilter(condition=[=(1, 0)])
+ +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testExceptWithFilter">
+ <Resource name="sql">
+ <![CDATA[SELECT c FROM (SELECT * FROM T1 EXCEPT (SELECT * FROM T2)) WHERE b < 2]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(c=[$2])
++- LogicalFilter(condition=[<($1, 2)])
+ +- LogicalMinus(all=[false])
+ :- LogicalProject(a=[$0], b=[$1], c=[$2])
+ : +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalProject(d=[$0], e=[$1], f=[$2])
+ +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(c=[$2])
++- LogicalFilter(condition=[<($1, 2)])
+ +- LogicalAggregate(group=[{0, 1, 2}])
+ +- LogicalJoin(condition=[AND(OR(=($0, $3), AND(IS NULL($0), IS NULL($3))), OR(=($1, $4), AND(IS NULL($1), IS NULL($4))), OR(=($2, $5), AND(IS NULL($2), IS NULL($5))))], joinType=[anti])
+ :- LogicalProject(a=[$0], b=[$1], c=[$2])
+ : +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalProject(d=[$0], e=[$1], f=[$2])
+ +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/subquery/FlinkRewriteSubQueryRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/subquery/FlinkRewriteSubQueryRuleTest.xml
new file mode 100644
index 0000000..a4ef320
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/subquery/FlinkRewriteSubQueryRuleTest.xml
@@ -0,0 +1,612 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testNotCountStarInScalarQuery">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM x WHERE (SELECT COUNT(e) FROM y WHERE d > 10) > 0]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[>($SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[COUNT($0)])
+ LogicalProject(e=[$1])
+ LogicalFilter(condition=[>($0, 10)])
+ LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+}), 0)])
+ +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalProject(a=[$0], b=[$1], c=[$2])
+ +- LogicalFilter(condition=[>($3, 0)])
+ +- LogicalJoin(condition=[true], joinType=[left])
+ :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+ +- LogicalAggregate(group=[{}], EXPR$0=[COUNT($0)])
+ +- LogicalProject(e=[$1])
+ +- LogicalFilter(condition=[>($0, 10)])
+ +- LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testNotEmptyGroupByInScalarQuery">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE d > 10 GROUP BY f) > 0]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[>($SCALAR_QUERY({
+LogicalProject(EXPR$0=[$1])
+ LogicalAggregate(group=[{0}], EXPR$0=[COUNT()])
+ LogicalProject(f=[$2])
+ LogicalFilter(condition=[>($0, 10)])
+ LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+}), 0)])
+ +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalProject(a=[$0], b=[$1], c=[$2])
+ +- LogicalFilter(condition=[>($3, 0)])
+ +- LogicalJoin(condition=[true], joinType=[left])
+ :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+ +- LogicalAggregate(group=[{}], agg#0=[SINGLE_VALUE($0)])
+ +- LogicalProject(EXPR$0=[$1])
+ +- LogicalAggregate(group=[{0}], EXPR$0=[COUNT()])
+ +- LogicalProject(f=[$2])
+ +- LogicalFilter(condition=[>($0, 10)])
+ +- LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSqlFromTpcDsQ41">
+ <Resource name="sql">
+ <![CDATA[
+SELECT DISTINCT (i_product_name)
+FROM item i1
+WHERE i_manufact_id BETWEEN 738 AND 738 + 40
+ AND (SELECT count(*) AS item_cnt
+FROM item
+WHERE (i_manufact = i1.i_manufact AND
+ ((i_category = 'Women' AND
+ (i_color = 'powder' OR i_color = 'khaki') AND
+ (i_units = 'Ounce' OR i_units = 'Oz') AND
+ (i_size = 'medium' OR i_size = 'extra large')
+ ) OR
+ (i_category = 'Women' AND
+ (i_color = 'brown' OR i_color = 'honeydew') AND
+ (i_units = 'Bunch' OR i_units = 'Ton') AND
+ (i_size = 'N/A' OR i_size = 'small')
+ ) OR
+ (i_category = 'Men' AND
+ (i_color = 'floral' OR i_color = 'deep') AND
+ (i_units = 'N/A' OR i_units = 'Dozen') AND
+ (i_size = 'petite' OR i_size = 'large')
+ ) OR
+ (i_category = 'Men' AND
+ (i_color = 'light' OR i_color = 'cornflower') AND
+ (i_units = 'Box' OR i_units = 'Pound') AND
+ (i_size = 'medium' OR i_size = 'extra large')
+ ))) OR
+ (i_manufact = i1.i_manufact AND
+ ((i_category = 'Women' AND
+ (i_color = 'midnight' OR i_color = 'snow') AND
+ (i_units = 'Pallet' OR i_units = 'Gross') AND
+ (i_size = 'medium' OR i_size = 'extra large')
+ ) OR
+ (i_category = 'Women' AND
+ (i_color = 'cyan' OR i_color = 'papaya') AND
+ (i_units = 'Cup' OR i_units = 'Dram') AND
+ (i_size = 'N/A' OR i_size = 'small')
+ ) OR
+ (i_category = 'Men' AND
+ (i_color = 'orange' OR i_color = 'frosted') AND
+ (i_units = 'Each' OR i_units = 'Tbl') AND
+ (i_size = 'petite' OR i_size = 'large')
+ ) OR
+ (i_category = 'Men' AND
+ (i_color = 'forest' OR i_color = 'ghost') AND
+ (i_units = 'Lb' OR i_units = 'Bundle') AND
+ (i_size = 'medium' OR i_size = 'extra large')
+ )))) > 0
+ORDER BY i_product_name
+LIMIT 100
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalSort(sort0=[$0], dir0=[ASC-nulls-first], fetch=[100])
++- LogicalAggregate(group=[{0}])
+ +- LogicalProject(i_product_name=[$2])
+ +- LogicalFilter(condition=[AND(>=($0, 738), <=($0, +(738, 40)), >($SCALAR_QUERY({
+LogicalAggregate(group=[{}], item_cnt=[COUNT()])
+ LogicalProject($f0=[0])
+ LogicalFilter(condition=[OR(AND(=($1, $cor0.i_manufact), OR(AND(=($3, _UTF-16LE'Women'), OR(=($4, _UTF-16LE'powder'), =($4, _UTF-16LE'khaki')), OR(=($5, _UTF-16LE'Ounce'), =($5, _UTF-16LE'Oz')), OR(=($6, _UTF-16LE'medium'), =($6, _UTF-16LE'extra large'))), AND(=($3, _UTF-16LE'Women'), OR(=($4, _UTF-16LE'brown'), =($4, _UTF-16LE'honeydew')), OR(=($5, _UTF-16LE'Bunch'), =($5, _UTF-16LE'Ton')), OR(=($6, _UTF-16LE'N/A'), =($6, _UTF-16LE'small'))), AND(=($3, _UTF-16LE'Men'), OR(=($4, _UTF [...]
+ LogicalTableScan(table=[[item, source: [TestTableSource(i_manufact_id, i_manufact, i_product_name, i_category, i_color, i_units, i_size)]]])
+}), 0))], variablesSet=[[$cor0]])
+ +- LogicalTableScan(table=[[item, source: [TestTableSource(i_manufact_id, i_manufact, i_product_name, i_category, i_color, i_units, i_size)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalSort(sort0=[$0], dir0=[ASC-nulls-first], fetch=[100])
++- LogicalAggregate(group=[{0}])
+ +- LogicalProject(i_product_name=[$2])
+ +- LogicalFilter(condition=[AND(>=($0, 738), <=($0, +(738, 40)))])
+ +- LogicalJoin(condition=[=($7, $1)], joinType=[semi])
+ :- LogicalTableScan(table=[[item, source: [TestTableSource(i_manufact_id, i_manufact, i_product_name, i_category, i_color, i_units, i_size)]]])
+ +- LogicalProject(i_manufact=[$1])
+ +- LogicalFilter(condition=[OR(AND(=($3, _UTF-16LE'Women'), OR(=($4, _UTF-16LE'powder'), =($4, _UTF-16LE'khaki')), OR(=($5, _UTF-16LE'Ounce'), =($5, _UTF-16LE'Oz')), OR(=($6, _UTF-16LE'medium'), =($6, _UTF-16LE'extra large'))), AND(=($3, _UTF-16LE'Women'), OR(=($4, _UTF-16LE'brown'), =($4, _UTF-16LE'honeydew')), OR(=($5, _UTF-16LE'Bunch'), =($5, _UTF-16LE'Ton')), OR(=($6, _UTF-16LE'N/A'), =($6, _UTF-16LE'small'))), AND(=($3, _UTF-16LE'Men'), OR(=($4, _UTF-16LE'floral'), =( [...]
+ +- LogicalTableScan(table=[[item, source: [TestTableSource(i_manufact_id, i_manufact, i_product_name, i_category, i_color, i_units, i_size)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSupportedConversionWithCorrelation1">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE a = d) > 0]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[>($SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+ LogicalProject($f0=[0])
+ LogicalFilter(condition=[=($cor0.a, $0)])
+ LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+}), 0)], variablesSet=[[$cor0]])
+ +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[=($0, $3)], joinType=[semi])
+ :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+ +- LogicalProject(d=[$0])
+ +- LogicalFilter(condition=[true])
+ +- LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSupportedConversionWithCorrelation2">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE a = d) > 0.9]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[>($SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+ LogicalProject($f0=[0])
+ LogicalFilter(condition=[=($cor0.a, $0)])
+ LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+}), 0.9:DECIMAL(2, 1))], variablesSet=[[$cor0]])
+ +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[=($0, $3)], joinType=[semi])
+ :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+ +- LogicalProject(d=[$0])
+ +- LogicalFilter(condition=[true])
+ +- LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSupportedConversionWithCorrelation3">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE a = d) >= 1]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[>=($SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+ LogicalProject($f0=[0])
+ LogicalFilter(condition=[=($cor0.a, $0)])
+ LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+}), 1)], variablesSet=[[$cor0]])
+ +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[=($0, $3)], joinType=[semi])
+ :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+ +- LogicalProject(d=[$0])
+ +- LogicalFilter(condition=[true])
+ +- LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSupportedConversionWithCorrelation4">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE a = d) >= 0.1]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[>=($SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+ LogicalProject($f0=[0])
+ LogicalFilter(condition=[=($cor0.a, $0)])
+ LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+}), 0.1:DECIMAL(2, 1))], variablesSet=[[$cor0]])
+ +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[=($0, $3)], joinType=[semi])
+ :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+ +- LogicalProject(d=[$0])
+ +- LogicalFilter(condition=[true])
+ +- LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSupportedConversionWithCorrelation5">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM x WHERE 0 < (SELECT COUNT(*) FROM y WHERE a = d)]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[<(0, $SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+ LogicalProject($f0=[0])
+ LogicalFilter(condition=[=($cor0.a, $0)])
+ LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+}))], variablesSet=[[$cor0]])
+ +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[=($0, $3)], joinType=[semi])
+ :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+ +- LogicalProject(d=[$0])
+ +- LogicalFilter(condition=[true])
+ +- LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSupportedConversionWithCorrelation6">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM x WHERE 0.99 < (SELECT COUNT(*) FROM y WHERE a = d)]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[<(0.99:DECIMAL(3, 2), $SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+ LogicalProject($f0=[0])
+ LogicalFilter(condition=[=($cor0.a, $0)])
+ LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+}))], variablesSet=[[$cor0]])
+ +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[=($0, $3)], joinType=[semi])
+ :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+ +- LogicalProject(d=[$0])
+ +- LogicalFilter(condition=[true])
+ +- LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSupportedConversionWithCorrelation7">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM x WHERE 1 <= (SELECT COUNT(*) FROM y WHERE a = d)]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[<=(1, $SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+ LogicalProject($f0=[0])
+ LogicalFilter(condition=[=($cor0.a, $0)])
+ LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+}))], variablesSet=[[$cor0]])
+ +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[=($0, $3)], joinType=[semi])
+ :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+ +- LogicalProject(d=[$0])
+ +- LogicalFilter(condition=[true])
+ +- LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSupportedConversionWithCorrelation8">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM x WHERE 0.01 <= (SELECT COUNT(*) FROM y WHERE a = d)]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[<=(0.01:DECIMAL(3, 2), $SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+ LogicalProject($f0=[0])
+ LogicalFilter(condition=[=($cor0.a, $0)])
+ LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+}))], variablesSet=[[$cor0]])
+ +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[=($0, $3)], joinType=[semi])
+ :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+ +- LogicalProject(d=[$0])
+ +- LogicalFilter(condition=[true])
+ +- LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSupportedConversionWithoutCorrelation1">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE d > 10) > 0]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[>($SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+ LogicalProject($f0=[0])
+ LogicalFilter(condition=[>($0, 10)])
+ LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+}), 0)])
+ +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[$3], joinType=[semi])
+ :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+ +- LogicalProject($f0=[IS NOT NULL($0)])
+ +- LogicalAggregate(group=[{}], m=[MIN($0)])
+ +- LogicalProject(i=[true])
+ +- LogicalFilter(condition=[>($0, 10)])
+ +- LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSupportedConversionWithoutCorrelation2">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE d > 10) > 0.9]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[>($SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+ LogicalProject($f0=[0])
+ LogicalFilter(condition=[>($0, 10)])
+ LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+}), 0.9:DECIMAL(2, 1))])
+ +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[$3], joinType=[semi])
+ :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+ +- LogicalProject($f0=[IS NOT NULL($0)])
+ +- LogicalAggregate(group=[{}], m=[MIN($0)])
+ +- LogicalProject(i=[true])
+ +- LogicalFilter(condition=[>($0, 10)])
+ +- LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSupportedConversionWithoutCorrelation3">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE d > 10) >= 1]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[>=($SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+ LogicalProject($f0=[0])
+ LogicalFilter(condition=[>($0, 10)])
+ LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+}), 1)])
+ +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[$3], joinType=[semi])
+ :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+ +- LogicalProject($f0=[IS NOT NULL($0)])
+ +- LogicalAggregate(group=[{}], m=[MIN($0)])
+ +- LogicalProject(i=[true])
+ +- LogicalFilter(condition=[>($0, 10)])
+ +- LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSupportedConversionWithoutCorrelation4">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE d > 10) >= 0.1]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[>=($SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+ LogicalProject($f0=[0])
+ LogicalFilter(condition=[>($0, 10)])
+ LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+}), 0.1:DECIMAL(2, 1))])
+ +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[$3], joinType=[semi])
+ :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+ +- LogicalProject($f0=[IS NOT NULL($0)])
+ +- LogicalAggregate(group=[{}], m=[MIN($0)])
+ +- LogicalProject(i=[true])
+ +- LogicalFilter(condition=[>($0, 10)])
+ +- LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSupportedConversionWithoutCorrelation5">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM x WHERE 0 < (SELECT COUNT(*) FROM y WHERE d > 10)]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[<(0, $SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+ LogicalProject($f0=[0])
+ LogicalFilter(condition=[>($0, 10)])
+ LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+}))])
+ +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[$3], joinType=[semi])
+ :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+ +- LogicalProject($f0=[IS NOT NULL($0)])
+ +- LogicalAggregate(group=[{}], m=[MIN($0)])
+ +- LogicalProject(i=[true])
+ +- LogicalFilter(condition=[>($0, 10)])
+ +- LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSupportedConversionWithoutCorrelation6">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM x WHERE 0.99 < (SELECT COUNT(*) FROM y WHERE d > 10)]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[<(0.99:DECIMAL(3, 2), $SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+ LogicalProject($f0=[0])
+ LogicalFilter(condition=[>($0, 10)])
+ LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+}))])
+ +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[$3], joinType=[semi])
+ :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+ +- LogicalProject($f0=[IS NOT NULL($0)])
+ +- LogicalAggregate(group=[{}], m=[MIN($0)])
+ +- LogicalProject(i=[true])
+ +- LogicalFilter(condition=[>($0, 10)])
+ +- LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSupportedConversionWithoutCorrelation7">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM x WHERE 1 <= (SELECT COUNT(*) FROM y WHERE d > 10)]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[<=(1, $SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+ LogicalProject($f0=[0])
+ LogicalFilter(condition=[>($0, 10)])
+ LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+}))])
+ +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[$3], joinType=[semi])
+ :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+ +- LogicalProject($f0=[IS NOT NULL($0)])
+ +- LogicalAggregate(group=[{}], m=[MIN($0)])
+ +- LogicalProject(i=[true])
+ +- LogicalFilter(condition=[>($0, 10)])
+ +- LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSupportedConversionWithoutCorrelation8">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM x WHERE 0.01 <= (SELECT COUNT(*) FROM y WHERE d > 10)]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[<=(0.01:DECIMAL(3, 2), $SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+ LogicalProject($f0=[0])
+ LogicalFilter(condition=[>($0, 10)])
+ LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+}))])
+ +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[$3], joinType=[semi])
+ :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+ +- LogicalProject($f0=[IS NOT NULL($0)])
+ +- LogicalAggregate(group=[{}], m=[MIN($0)])
+ +- LogicalProject(i=[true])
+ +- LogicalFilter(condition=[>($0, 10)])
+ +- LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/LimitTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/LimitTest.xml
index 285ee01..3b6ffc6 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/LimitTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/LimitTest.xml
@@ -29,10 +29,7 @@ LogicalSort(fetch=[0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a, c])
-+- Limit(offset=[0], fetch=[0])
- +- Exchange(distribution=[single])
- +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -89,10 +86,7 @@ LogicalSort(offset=[10], fetch=[0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a, c])
-+- Limit(offset=[10], fetch=[0])
- +- Exchange(distribution=[single])
- +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -109,10 +103,7 @@ LogicalSort(offset=[0], fetch=[0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a, c])
-+- Limit(offset=[0], fetch=[0])
- +- Exchange(distribution=[single])
- +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -129,10 +120,7 @@ LogicalSort(fetch=[0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime])
-+- Limit(offset=[0], fetch=[0])
- +- Exchange(distribution=[single])
- +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -149,10 +137,7 @@ LogicalSort(fetch=[0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[PROCTIME_MATERIALIZE(proctime) AS proctime, c])
-+- Limit(offset=[0], fetch=[0])
- +- Exchange(distribution=[single])
- +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -169,10 +154,7 @@ LogicalSort(fetch=[0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[PROCTIME_MATERIALIZE(proctime) AS desc, c])
-+- Limit(offset=[0], fetch=[0])
- +- Exchange(distribution=[single])
- +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -189,10 +171,7 @@ LogicalSort(fetch=[0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[c, PROCTIME_MATERIALIZE(proctime) AS desc])
-+- Limit(offset=[0], fetch=[0])
- +- Exchange(distribution=[single])
- +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -209,10 +188,7 @@ LogicalSort(fetch=[0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[c, PROCTIME_MATERIALIZE(proctime) AS proctime])
-+- Limit(offset=[0], fetch=[0])
- +- Exchange(distribution=[single])
- +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -229,10 +205,7 @@ LogicalSort(fetch=[0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[rowtime, c])
-+- Limit(offset=[0], fetch=[0])
- +- Exchange(distribution=[single])
- +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -249,10 +222,7 @@ LogicalSort(fetch=[0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[rowtime AS desc, c])
-+- Limit(offset=[0], fetch=[0])
- +- Exchange(distribution=[single])
- +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -269,10 +239,7 @@ LogicalSort(fetch=[0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[c, rowtime AS desc])
-+- Limit(offset=[0], fetch=[0])
- +- Exchange(distribution=[single])
- +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -289,10 +256,7 @@ LogicalSort(fetch=[0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[c, rowtime])
-+- Limit(offset=[0], fetch=[0])
- +- Exchange(distribution=[single])
- +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SetOperatorsTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SetOperatorsTest.xml
new file mode 100644
index 0000000..d30ecdd
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SetOperatorsTest.xml
@@ -0,0 +1,226 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testIntersect">
+ <Resource name="sql">
+ <![CDATA[SELECT c FROM T1 INTERSECT SELECT f FROM T2]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalIntersect(all=[false])
+:- LogicalProject(c=[$2])
+: +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(f=[$2])
+ +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+GroupAggregate(groupBy=[c], select=[c])
++- Exchange(distribution=[hash[c]])
+ +- Join(joinType=[LeftSemiJoin], where=[OR(=(c, f), AND(IS NULL(c), IS NULL(f)))], select=[c])
+ :- Exchange(distribution=[hash[c]])
+ : +- Calc(select=[c])
+ : +- TableSourceScan(table=[[T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- Exchange(distribution=[hash[f]])
+ +- Calc(select=[f])
+ +- TableSourceScan(table=[[T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testIntersectLeftIsEmpty">
+ <Resource name="sql">
+ <![CDATA[SELECT c FROM T1 WHERE 1=0 INTERSECT SELECT f FROM T2]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalIntersect(all=[false])
+:- LogicalProject(c=[$2])
+: +- LogicalFilter(condition=[=(1, 0)])
+: +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(f=[$2])
+ +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Values(tuples=[[]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testIntersectRightIsEmpty">
+ <Resource name="sql">
+ <![CDATA[SELECT c FROM T1 INTERSECT SELECT f FROM T2 WHERE 1=0]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalIntersect(all=[false])
+:- LogicalProject(c=[$2])
+: +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(f=[$2])
+ +- LogicalFilter(condition=[=(1, 0)])
+ +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Values(tuples=[[]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMinus">
+ <Resource name="sql">
+ <![CDATA[SELECT c FROM T1 EXCEPT SELECT f FROM T2]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalMinus(all=[false])
+:- LogicalProject(c=[$2])
+: +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(f=[$2])
+ +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+GroupAggregate(groupBy=[c], select=[c])
++- Exchange(distribution=[hash[c]])
+ +- Join(joinType=[LeftAntiJoin], where=[OR(=(c, f), AND(IS NULL(c), IS NULL(f)))], select=[c])
+ :- Exchange(distribution=[hash[c]])
+ : +- Calc(select=[c])
+ : +- TableSourceScan(table=[[T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- Exchange(distribution=[hash[f]])
+ +- Calc(select=[f])
+ +- TableSourceScan(table=[[T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMinusLeftIsEmpty">
+ <Resource name="sql">
+ <![CDATA[SELECT c FROM T1 WHERE 1=0 EXCEPT SELECT f FROM T2]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalMinus(all=[false])
+:- LogicalProject(c=[$2])
+: +- LogicalFilter(condition=[=(1, 0)])
+: +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(f=[$2])
+ +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Values(tuples=[[]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMinusRightIsEmpty">
+ <Resource name="sql">
+ <![CDATA[SELECT c FROM T1 EXCEPT SELECT f FROM T2 WHERE 1=0]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalMinus(all=[false])
+:- LogicalProject(c=[$2])
+: +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(f=[$2])
+ +- LogicalFilter(condition=[=(1, 0)])
+ +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+GroupAggregate(groupBy=[c], select=[c])
++- Exchange(distribution=[hash[c]])
+ +- Calc(select=[c])
+ +- TableSourceScan(table=[[T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMinusWithNestedTypes">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable EXCEPT SELECT * FROM MyTable]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalMinus(all=[false])
+:- LogicalProject(a=[$0], b=[$1], c=[$2])
+: +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(a=[$0], b=[$1], c=[$2])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+GroupAggregate(groupBy=[a, b, c], select=[a, b, c])
++- Exchange(distribution=[hash[a, b, c]])
+ +- Join(joinType=[LeftAntiJoin], where=[AND(OR(=(a, a0), AND(IS NULL(a), IS NULL(a0))), OR(=(b, b0), AND(IS NULL(b), IS NULL(b0))), OR(=(c, c0), AND(IS NULL(c), IS NULL(c0))))], select=[a, b, c])
+ :- Exchange(distribution=[hash[a, b, c]], reuse_id=[1])
+ : +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- Reused(reference_id=[1])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testUnionNullableTypes">
+ <Resource name="sql">
+ <![CDATA[SELECT a FROM A UNION ALL SELECT CASE WHEN c > 0 THEN b ELSE NULL END FROM A]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalUnion(all=[true])
+:- LogicalProject(a=[$0])
+: +- LogicalTableScan(table=[[A, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(EXPR$0=[CASE(>($2, 0), $1, null:RecordType:peek_no_expand(INTEGER _1, VARCHAR(65536) CHARACTER SET "UTF-16LE" _2))])
+ +- LogicalTableScan(table=[[A, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Union(all=[true], union=[a])
+:- Calc(select=[a])
+: +- TableSourceScan(table=[[A, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], reuse_id=[1])
++- Calc(select=[CASE(>(c, 0), b, null:RecordType:peek_no_expand(INTEGER _1, VARCHAR(65536) CHARACTER SET "UTF-16LE" _2)) AS EXPR$0])
+ +- Reused(reference_id=[1])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testUnionAnyType">
+ <Resource name="sql">
+ <![CDATA[SELECT a FROM A UNION ALL SELECT b FROM A]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalUnion(all=[true])
+:- LogicalProject(a=[$0])
+: +- LogicalTableScan(table=[[A, source: [TestTableSource(a, b)]]])
++- LogicalProject(b=[$1])
+ +- LogicalTableScan(table=[[A, source: [TestTableSource(a, b)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Union(all=[true], union=[a])
+:- Calc(select=[a])
+: +- TableSourceScan(table=[[A, source: [TestTableSource(a, b)]]], fields=[a, b], reuse_id=[1])
++- Calc(select=[b])
+ +- Reused(reference_id=[1])
+]]>
+ </Resource>
+ </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SortLimitTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SortLimitTest.xml
index 0faf861..256ca26 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SortLimitTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SortLimitTest.xml
@@ -217,10 +217,7 @@ LogicalProject(a=[$0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a])
-+- SortLimit(orderBy=[proctime ASC, c ASC], offset=[0], fetch=[0])
- +- Exchange(distribution=[single])
- +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -259,10 +256,7 @@ LogicalProject(a=[$0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a])
-+- SortLimit(orderBy=[proctime ASC, c ASC], offset=[1], fetch=[0])
- +- Exchange(distribution=[single])
- +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -301,10 +295,7 @@ LogicalProject(a=[$0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a])
-+- SortLimit(orderBy=[proctime DESC], offset=[0], fetch=[0])
- +- Exchange(distribution=[single])
- +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -427,10 +418,7 @@ LogicalProject(a=[$0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a])
-+- SortLimit(orderBy=[c ASC, proctime DESC], offset=[0], fetch=[0])
- +- Exchange(distribution=[single])
- +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -574,10 +562,7 @@ LogicalProject(a=[$0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a])
-+- SortLimit(orderBy=[rowtime ASC], offset=[0], fetch=[0])
- +- Exchange(distribution=[single])
- +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -616,10 +601,7 @@ LogicalProject(a=[$0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a])
-+- SortLimit(orderBy=[rowtime ASC], offset=[0], fetch=[0])
- +- Exchange(distribution=[single])
- +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -658,10 +640,7 @@ LogicalProject(a=[$0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a])
-+- SortLimit(orderBy=[rowtime ASC], offset=[1], fetch=[0])
- +- Exchange(distribution=[single])
- +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -784,10 +763,7 @@ LogicalProject(a=[$0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a])
-+- SortLimit(orderBy=[rowtime DESC], offset=[0], fetch=[0])
- +- Exchange(distribution=[single])
- +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -826,10 +802,7 @@ LogicalProject(a=[$0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a])
-+- SortLimit(orderBy=[rowtime DESC], offset=[1], fetch=[0])
- +- Exchange(distribution=[single])
- +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -868,10 +841,7 @@ LogicalProject(a=[$0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a])
-+- SortLimit(orderBy=[rowtime ASC, c ASC], offset=[0], fetch=[0])
- +- Exchange(distribution=[single])
- +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -910,10 +880,7 @@ LogicalProject(a=[$0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a])
-+- SortLimit(orderBy=[rowtime ASC, c ASC], offset=[1], fetch=[0])
- +- Exchange(distribution=[single])
- +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -952,10 +919,7 @@ LogicalProject(a=[$0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a])
-+- SortLimit(orderBy=[rowtime DESC], offset=[0], fetch=[0])
- +- Exchange(distribution=[single])
- +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -994,10 +958,7 @@ LogicalProject(a=[$0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a])
-+- SortLimit(orderBy=[rowtime ASC, c DESC], offset=[1], fetch=[0])
- +- Exchange(distribution=[single])
- +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -1036,10 +997,7 @@ LogicalProject(a=[$0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a])
-+- SortLimit(orderBy=[c ASC, rowtime ASC], offset=[0], fetch=[0])
- +- Exchange(distribution=[single])
- +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -1078,10 +1036,7 @@ LogicalProject(a=[$0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a])
-+- SortLimit(orderBy=[c ASC, rowtime DESC], offset=[0], fetch=[0])
- +- Exchange(distribution=[single])
- +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
]]>
</Resource>
</TestCase>
@@ -1225,10 +1180,7 @@ LogicalProject(a=[$0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a])
-+- SortLimit(orderBy=[c ASC], offset=[0], fetch=[0])
- +- Exchange(distribution=[single])
- +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SubplanReuseTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SubplanReuseTest.xml
index 07639da..9f1f894 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SubplanReuseTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SubplanReuseTest.xml
@@ -647,4 +647,41 @@ Union(all=[true], union=[a, b])
]]>
</Resource>
</TestCase>
+ <TestCase name="testSubplanReuseWithDynamicFunction">
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalIntersect(all=[false])
+:- LogicalIntersect(all=[false])
+: :- LogicalProject(random=[$0])
+: : +- LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[1])
+: : +- LogicalProject(random=[$0], EXPR$1=[RAND()])
+: : +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+: +- LogicalProject(random=[$0])
+: +- LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[1])
+: +- LogicalProject(random=[$0], EXPR$1=[RAND()])
+: +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(random=[$0])
+ +- LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[1])
+ +- LogicalProject(random=[$0], EXPR$1=[RAND()])
+ +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Join(joinType=[LeftSemiJoin], where=[OR(=(random, random0), AND(IS NULL(random), IS NULL(random0)))], select=[random])
+:- Exchange(distribution=[hash[random]])
+: +- GroupAggregate(groupBy=[random], select=[random])
+: +- Exchange(distribution=[hash[random]])
+: +- Join(joinType=[LeftSemiJoin], where=[OR(=(random, random0), AND(IS NULL(random), IS NULL(random0)))], select=[random])
+: :- Exchange(distribution=[hash[random]], reuse_id=[1])
+: : +- Calc(select=[random])
+: : +- SortLimit(orderBy=[EXPR$1 ASC], offset=[0], fetch=[1])
+: : +- Exchange(distribution=[single])
+: : +- Calc(select=[a AS random, RAND() AS EXPR$1])
+: : +- TableSourceScan(table=[[x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+: +- Reused(reference_id=[1])
++- Reused(reference_id=[1])
+]]>
+ </Resource>
+ </TestCase>
</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/LimitTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/LimitTest.scala
index 0cfcfe7..9f5a4e5 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/LimitTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/LimitTest.scala
@@ -28,7 +28,6 @@ class LimitTest extends TableTestBase {
private val util = batchTestUtil()
util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
- // TODO optimize `limit 0`
@Test
def testLimitWithoutOffset(): Unit = {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SetOperatorsTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SetOperatorsTest.scala
new file mode 100644
index 0000000..635b9b7
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SetOperatorsTest.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.batch.sql
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.GenericTypeInfo
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.{TableConfigOptions, TableException, ValidationException}
+import org.apache.flink.table.plan.util.NonPojo
+import org.apache.flink.table.util.TableTestBase
+
+import org.junit.{Before, Test}
+
+class SetOperatorsTest extends TableTestBase {
+
+ private val util = batchTestUtil()
+
+ @Before
+ def before(): Unit = {
+ util.tableEnv.getConfig.getConf.setString(
+ TableConfigOptions.SQL_EXEC_DISABLED_OPERATORS, "SortAgg")
+ util.addTableSource[(Int, Long, String)]("T1", 'a, 'b, 'c)
+ util.addTableSource[(Int, Long, String)]("T2", 'd, 'e, 'f)
+ util.addTableSource[(Int, Long, Int, String, Long)]("T3", 'a, 'b, 'd, 'c, 'e)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testUnionDifferentColumnSize(): Unit = {
+ // must fail. Union inputs have different column size.
+ util.verifyPlan("SELECT * FROM T1 UNION ALL SELECT * FROM T3")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testUnionDifferentFieldTypes(): Unit = {
+ // must fail. Union inputs have different field types.
+ util.verifyPlan("SELECT a, b, c FROM T1 UNION ALL SELECT d, c, e FROM T3")
+ }
+
+ @Test(expected = classOf[TableException])
+ def testIntersectAll(): Unit = {
+ util.verifyPlan("SELECT c FROM T1 INTERSECT ALL SELECT f FROM T2")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testIntersectDifferentFieldTypes(): Unit = {
+ // must fail. Intersect inputs have different field types.
+ util.verifyPlan("SELECT a, b, c FROM T1 INTERSECT SELECT d, c, e FROM T3")
+ }
+
+ @Test(expected = classOf[TableException])
+ def testMinusAll(): Unit = {
+ util.verifyPlan("SELECT c FROM T1 EXCEPT ALL SELECT f FROM T2")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testMinusDifferentFieldTypes(): Unit = {
+ // must fail. Minus inputs have different field types.
+ util.verifyPlan("SELECT a, b, c FROM T1 EXCEPT SELECT d, c, e FROM T3")
+ }
+
+ @Test
+ def testIntersect(): Unit = {
+ util.verifyPlan("SELECT c FROM T1 INTERSECT SELECT f FROM T2")
+ }
+
+ @Test
+ def testIntersectLeftIsEmpty(): Unit = {
+ util.verifyPlan("SELECT c FROM T1 WHERE 1=0 INTERSECT SELECT f FROM T2")
+ }
+
+ @Test
+ def testIntersectRightIsEmpty(): Unit = {
+ util.verifyPlan("SELECT c FROM T1 INTERSECT SELECT f FROM T2 WHERE 1=0")
+ }
+
+ @Test
+ def testMinus(): Unit = {
+ util.verifyPlan("SELECT c FROM T1 EXCEPT SELECT f FROM T2")
+ }
+
+ @Test
+ def testMinusLeftIsEmpty(): Unit = {
+ util.verifyPlan("SELECT c FROM T1 WHERE 1=0 EXCEPT SELECT f FROM T2")
+ }
+
+ @Test
+ def testMinusRightIsEmpty(): Unit = {
+ util.verifyPlan("SELECT c FROM T1 EXCEPT SELECT f FROM T2 WHERE 1=0")
+ }
+
+ @Test
+ def testMinusWithNestedTypes(): Unit = {
+ util.addTableSource[(Long, (Int, String), Array[Boolean])]("MyTable", 'a, 'b, 'c)
+ util.verifyPlan("SELECT * FROM MyTable EXCEPT SELECT * FROM MyTable")
+ }
+
+ @Test
+ def testUnionNullableTypes(): Unit = {
+ util.addTableSource[((Int, String), (Int, String), Int)]("A", 'a, 'b, 'c)
+ util.verifyPlan("SELECT a FROM A UNION ALL SELECT CASE WHEN c > 0 THEN b ELSE NULL END FROM A")
+ }
+
+ @Test
+ def testUnionAnyType(): Unit = {
+ val util = batchTestUtil()
+ util.addTableSource("A",
+ Array[TypeInformation[_]](
+ new GenericTypeInfo(classOf[NonPojo]),
+ new GenericTypeInfo(classOf[NonPojo])),
+ Array("a", "b"))
+ util.verifyPlan("SELECT a FROM A UNION ALL SELECT b FROM A")
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SortLimitTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SortLimitTest.scala
index 4ebdf44..4aff8d0 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SortLimitTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SortLimitTest.scala
@@ -29,7 +29,6 @@ class SortLimitTest extends TableTestBase {
private val util = batchTestUtil()
util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
util.tableEnv.getConfig.getConf.setInteger(TableConfigOptions.SQL_EXEC_SORT_DEFAULT_LIMIT, 200)
- // TODO optimize `limit 0`
@Test
def testNonRangeSortWithoutOffset(): Unit = {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.scala
index f85d8de..5de63bf 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.scala
@@ -368,8 +368,7 @@ class SubplanReuseTest extends TableTestBase {
util.verifyPlan(sqlQuery)
}
- @Test(expected = classOf[TableException])
- // INTERSECT is not supported now
+ @Test
def testSubplanReuseWithDynamicFunction(): Unit = {
val sqlQuery = util.tableEnv.sqlQuery(
"""
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkLimit0RemoveRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkLimit0RemoveRuleTest.scala
new file mode 100644
index 0000000..60af860
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkLimit0RemoveRuleTest.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.calcite.CalciteConfig
+import org.apache.flink.table.plan.optimize.program.{BatchOptimizeContext, FlinkChainedProgram, FlinkHepRuleSetProgramBuilder, HEP_RULES_EXECUTION_TYPE}
+import org.apache.flink.table.util.TableTestBase
+
+import org.apache.calcite.plan.hep.HepMatchOrder
+import org.apache.calcite.tools.RuleSets
+import org.junit.{Before, Test}
+
+/**
+ * Test for [[FlinkLimit0RemoveRule]].
+ */
+class FlinkLimit0RemoveRuleTest extends TableTestBase {
+
+ private val util = batchTestUtil()
+
+ @Before
+ def setup(): Unit = {
+ val programs = new FlinkChainedProgram[BatchOptimizeContext]()
+ programs.addLast(
+ "rules",
+ FlinkHepRuleSetProgramBuilder.newBuilder
+ .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
+ .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+ .add(RuleSets.ofList(
+ FlinkSubQueryRemoveRule.FILTER,
+ FlinkLimit0RemoveRule.INSTANCE))
+ .build()
+ )
+ val calciteConfig = CalciteConfig.createBuilder(util.tableEnv.getConfig.getCalciteConfig)
+ .replaceBatchProgram(programs).build()
+ util.tableEnv.getConfig.setCalciteConfig(calciteConfig)
+
+ util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+ }
+
+ @Test
+ def testSimpleLimitZero(): Unit = {
+ util.verifyPlan("SELECT * FROM MyTable LIMIT 0")
+ }
+
+ @Test
+ def testLimitZeroWithOrderBy(): Unit = {
+ util.verifyPlan("SELECT * FROM MyTable ORDER BY a LIMIT 0")
+ }
+
+ @Test
+ def testLimitZeroWithOffset(): Unit = {
+ util.verifyPlan("SELECT * FROM MyTable ORDER BY a LIMIT 0 OFFSET 10")
+ }
+
+ @Test
+ def testLimitZeroWithSelect(): Unit = {
+ util.verifyPlan("SELECT * FROM (SELECT a FROM MyTable LIMIT 0)")
+ }
+
+ @Test
+ def testLimitZeroWithIn(): Unit = {
+ util.verifyPlan("SELECT * FROM MyTable WHERE a IN (SELECT a FROM MyTable LIMIT 0)")
+ }
+
+ @Test
+ def testLimitZeroWithNotIn(): Unit = {
+ util.verifyPlan("SELECT * FROM MyTable WHERE a NOT IN (SELECT a FROM MyTable LIMIT 0)")
+ }
+
+ @Test
+ def testLimitZeroWithExists(): Unit = {
+ util.verifyPlan("SELECT * FROM MyTable WHERE EXISTS (SELECT a FROM MyTable LIMIT 0)")
+ }
+
+ @Test
+ def testLimitZeroWithNotExists(): Unit = {
+ util.verifyPlan("SELECT * FROM MyTable WHERE NOT EXISTS (SELECT a FROM MyTable LIMIT 0)")
+ }
+
+ @Test
+ def testLimitZeroWithJoin(): Unit = {
+ util.verifyPlan("SELECT * FROM MyTable INNER JOIN (SELECT * FROM MyTable Limit 0) ON TRUE")
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkPruneEmptyRulesTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkPruneEmptyRulesTest.scala
new file mode 100644
index 0000000..3f15f18
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkPruneEmptyRulesTest.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.calcite.CalciteConfig
+import org.apache.flink.table.plan.optimize.program.{BatchOptimizeContext, FlinkChainedProgram, FlinkHepRuleSetProgramBuilder, HEP_RULES_EXECUTION_TYPE}
+import org.apache.flink.table.util.TableTestBase
+
+import org.apache.calcite.plan.hep.HepMatchOrder
+import org.apache.calcite.rel.rules.{PruneEmptyRules, ReduceExpressionsRule}
+import org.apache.calcite.tools.RuleSets
+import org.junit.{Before, Test}
+
+/**
+ * Test for [[FlinkPruneEmptyRules]].
+ */
+class FlinkPruneEmptyRulesTest extends TableTestBase {
+
+ private val util = batchTestUtil()
+
+ @Before
+ def setup(): Unit = {
+ val programs = new FlinkChainedProgram[BatchOptimizeContext]()
+ programs.addLast(
+ "rules",
+ FlinkHepRuleSetProgramBuilder.newBuilder
+ .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
+ .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+ .add(RuleSets.ofList(
+ FlinkSubQueryRemoveRule.FILTER,
+ ReduceExpressionsRule.FILTER_INSTANCE,
+ ReduceExpressionsRule.PROJECT_INSTANCE,
+ PruneEmptyRules.FILTER_INSTANCE,
+ PruneEmptyRules.PROJECT_INSTANCE,
+ FlinkPruneEmptyRules.JOIN_RIGHT_INSTANCE))
+ .build()
+ )
+ val calciteConfig = CalciteConfig.createBuilder(util.tableEnv.getConfig.getCalciteConfig)
+ .replaceBatchProgram(programs).build()
+ util.tableEnv.getConfig.setCalciteConfig(calciteConfig)
+
+ util.addTableSource[(Int, Long, String)]("T1", 'a, 'b, 'c)
+ util.addTableSource[(Int, Long, String)]("T2", 'd, 'e, 'f)
+ }
+
+ @Test
+ def testSemiJoinRightIsEmpty(): Unit = {
+ util.verifyPlan("SELECT * FROM T1 WHERE a IN (SELECT d FROM T2 WHERE 1=0)")
+ }
+
+ @Test
+ def testAntiJoinRightIsEmpty(): Unit = {
+ util.verifyPlan("SELECT * FROM T1 WHERE a NOT IN (SELECT d FROM T2 WHERE 1=0)")
+ }
+
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/ReplaceIntersectWithSemiJoinRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/ReplaceIntersectWithSemiJoinRuleTest.scala
new file mode 100644
index 0000000..c867de8
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/ReplaceIntersectWithSemiJoinRuleTest.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.calcite.CalciteConfig
+import org.apache.flink.table.plan.optimize.program.{BatchOptimizeContext, FlinkChainedProgram,
+ FlinkHepRuleSetProgramBuilder, HEP_RULES_EXECUTION_TYPE}
+import org.apache.flink.table.util.TableTestBase
+
+import org.apache.calcite.plan.hep.HepMatchOrder
+import org.apache.calcite.tools.RuleSets
+import org.junit.{Before, Test}
+
+/**
+ * Test for [[ReplaceIntersectWithSemiJoinRule]].
+ */
+class ReplaceIntersectWithSemiJoinRuleTest extends TableTestBase {
+
+ private val util = batchTestUtil()
+
+ @Before
+ def setup(): Unit = {
+ val programs = new FlinkChainedProgram[BatchOptimizeContext]()
+ programs.addLast(
+ "rules",
+ FlinkHepRuleSetProgramBuilder.newBuilder
+ .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
+ .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+ .add(RuleSets.ofList(ReplaceIntersectWithSemiJoinRule.INSTANCE))
+ .build()
+ )
+ val calciteConfig = CalciteConfig.createBuilder(util.tableEnv.getConfig.getCalciteConfig)
+ .replaceBatchProgram(programs).build()
+ util.tableEnv.getConfig.setCalciteConfig(calciteConfig)
+
+
+ util.addTableSource[(Int, Long, String)]("T1", 'a, 'b, 'c)
+ util.addTableSource[(Int, Long, String)]("T2", 'd, 'e, 'f)
+ }
+
+ @Test
+ def testIntersectAll(): Unit = {
+ util.verifyPlanNotExpected("SELECT c FROM T1 INTERSECT ALL SELECT f FROM T2", "joinType=[semi]")
+ }
+
+ @Test
+ def testIntersect(): Unit = {
+ util.verifyPlan("SELECT c FROM T1 INTERSECT SELECT f FROM T2")
+ }
+
+ @Test
+ def testIntersectWithFilter(): Unit = {
+ util.verifyPlan("SELECT c FROM ((SELECT * FROM T1) INTERSECT (SELECT * FROM T2)) WHERE a > 1")
+ }
+
+ @Test
+ def testIntersectLeftIsEmpty(): Unit = {
+ util.verifyPlan("SELECT c FROM T1 WHERE 1=0 INTERSECT SELECT f FROM T2")
+ }
+
+ @Test
+ def testIntersectRightIsEmpty(): Unit = {
+ util.verifyPlan("SELECT c FROM T1 INTERSECT SELECT f FROM T2 WHERE 1=0")
+ }
+
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/ReplaceMinusWithAntiJoinRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/ReplaceMinusWithAntiJoinRuleTest.scala
new file mode 100644
index 0000000..f5b6fbe
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/ReplaceMinusWithAntiJoinRuleTest.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.calcite.CalciteConfig
+import org.apache.flink.table.plan.optimize.program.{BatchOptimizeContext, FlinkChainedProgram, FlinkHepRuleSetProgramBuilder, HEP_RULES_EXECUTION_TYPE}
+import org.apache.flink.table.util.TableTestBase
+
+import org.apache.calcite.plan.hep.HepMatchOrder
+import org.apache.calcite.tools.RuleSets
+import org.junit.{Before, Test}
+
+/**
+ * Test for [[ReplaceMinusWithAntiJoinRule]].
+ */
+class ReplaceMinusWithAntiJoinRuleTest extends TableTestBase {
+
+ private val util = batchTestUtil()
+
+ @Before
+ def setup(): Unit = {
+ val programs = new FlinkChainedProgram[BatchOptimizeContext]()
+ programs.addLast(
+ "rules",
+ FlinkHepRuleSetProgramBuilder.newBuilder
+ .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
+ .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+ .add(RuleSets.ofList(ReplaceMinusWithAntiJoinRule.INSTANCE))
+ .build()
+ )
+ val calciteConfig = CalciteConfig.createBuilder(util.tableEnv.getConfig.getCalciteConfig)
+ .replaceBatchProgram(programs).build()
+ util.tableEnv.getConfig.setCalciteConfig(calciteConfig)
+
+
+ util.addTableSource[(Int, Long, String)]("T1", 'a, 'b, 'c)
+ util.addTableSource[(Int, Long, String)]("T2", 'd, 'e, 'f)
+ }
+
+ @Test
+ def testExceptAll(): Unit = {
+ util.verifyPlanNotExpected("SELECT c FROM T1 EXCEPT ALL SELECT f FROM T2", "joinType=[anti]")
+ }
+
+ @Test
+ def testExcept(): Unit = {
+ util.verifyPlan("SELECT c FROM T1 EXCEPT SELECT f FROM T2")
+ }
+
+ @Test
+ def testExceptWithFilter(): Unit = {
+ util.verifyPlan("SELECT c FROM (SELECT * FROM T1 EXCEPT (SELECT * FROM T2)) WHERE b < 2")
+ }
+
+ @Test
+ def testExceptLeftIsEmpty(): Unit = {
+ util.verifyPlan("SELECT c FROM T1 WHERE 1=0 EXCEPT SELECT f FROM T2")
+ }
+
+ @Test
+ def testExceptRightIsEmpty(): Unit = {
+ util.verifyPlan("SELECT c FROM T1 EXCEPT SELECT f FROM T2 WHERE 1=0")
+ }
+
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/subquery/FlinkRewriteSubQueryRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/subquery/FlinkRewriteSubQueryRuleTest.scala
new file mode 100644
index 0000000..255e2ba
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/subquery/FlinkRewriteSubQueryRuleTest.scala
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical.subquery
+
+import org.apache.flink.api.scala._
+
+import org.junit.{Before, Test}
+
+/**
+ * Test for [[org.apache.flink.table.plan.rules.logical.FlinkRewriteSubQueryRule]].
+ */
+class FlinkRewriteSubQueryRuleTest extends SubQueryTestBase {
+
+ @Before
+ def setup(): Unit = {
+ util.addTableSource[(Int, Long, String)]("x", 'a, 'b, 'c)
+ util.addTableSource[(Int, Long, String)]("y", 'd, 'e, 'f)
+ }
+
+ @Test
+ def testNotCountStarInScalarQuery(): Unit = {
+ util.verifyPlan("SELECT * FROM x WHERE (SELECT COUNT(e) FROM y WHERE d > 10) > 0")
+ }
+
+ @Test
+ def testNotEmptyGroupByInScalarQuery(): Unit = {
+ util.verifyPlan("SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE d > 10 GROUP BY f) > 0")
+ }
+
+ @Test
+ def testUnsupportedConversionWithUnexpectedComparisonNumber(): Unit = {
+ // without correlation
+ util.verifyPlanNotExpected(
+ "SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE d > 10) > 1", "joinType=[semi]")
+ util.verifyPlanNotExpected(
+ "SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE d > 10) >= 0", "joinType=[semi]")
+ util.verifyPlanNotExpected(
+ "SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE d > 10) > -1", "joinType=[semi]")
+ util.verifyPlanNotExpected(
+ "SELECT * FROM x WHERE 0 <= (SELECT COUNT(*) FROM y WHERE d > 10)", "joinType=[semi]")
+ util.verifyPlanNotExpected(
+ "SELECT * FROM x WHERE -1 < (SELECT COUNT(*) FROM y WHERE d > 10)", "joinType=[semi]")
+
+ // with correlation
+ util.verifyPlanNotExpected(
+ "SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE a = d) > 1", "joinType=[semi]")
+ util.verifyPlanNotExpected(
+ "SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE a = d) >= 0", "joinType=[semi]")
+ util.verifyPlanNotExpected(
+ "SELECT * FROM x WHERE 1 < (SELECT COUNT(*) FROM y WHERE a = d)", "joinType=[semi]")
+ util.verifyPlanNotExpected(
+ "SELECT * FROM x WHERE 0 <= (SELECT COUNT(*) FROM y WHERE a = d)", "joinType=[semi]")
+ }
+
+ @Test
+ def testSupportedConversionWithoutCorrelation1(): Unit = {
+ util.verifyPlan("SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE d > 10) > 0")
+ }
+
+ @Test
+ def testSupportedConversionWithoutCorrelation2(): Unit = {
+ util.verifyPlan("SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE d > 10) > 0.9")
+ }
+
+ @Test
+ def testSupportedConversionWithoutCorrelation3(): Unit = {
+ util.verifyPlan("SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE d > 10) >= 1")
+ }
+
+ @Test
+ def testSupportedConversionWithoutCorrelation4(): Unit = {
+ util.verifyPlan("SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE d > 10) >= 0.1")
+ }
+
+ @Test
+ def testSupportedConversionWithoutCorrelation5(): Unit = {
+ util.verifyPlan("SELECT * FROM x WHERE 0 < (SELECT COUNT(*) FROM y WHERE d > 10)")
+ }
+
+ @Test
+ def testSupportedConversionWithoutCorrelation6(): Unit = {
+ util.verifyPlan("SELECT * FROM x WHERE 0.99 < (SELECT COUNT(*) FROM y WHERE d > 10)")
+ }
+
+ @Test
+ def testSupportedConversionWithoutCorrelation7(): Unit = {
+ util.verifyPlan("SELECT * FROM x WHERE 1 <= (SELECT COUNT(*) FROM y WHERE d > 10)")
+ }
+
+ @Test
+ def testSupportedConversionWithoutCorrelation8(): Unit = {
+ util.verifyPlan("SELECT * FROM x WHERE 0.01 <= (SELECT COUNT(*) FROM y WHERE d > 10)")
+ }
+
+ @Test
+ def testSupportedConversionWithCorrelation1(): Unit = {
+ // with correlation
+ util.verifyPlan("SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE a = d) > 0")
+ }
+
+ @Test
+ def testSupportedConversionWithCorrelation2(): Unit = {
+ util.verifyPlan("SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE a = d) > 0.9")
+ }
+
+ @Test
+ def testSupportedConversionWithCorrelation3(): Unit = {
+ util.verifyPlan("SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE a = d) >= 1")
+ }
+
+ @Test
+ def testSupportedConversionWithCorrelation4(): Unit = {
+ util.verifyPlan("SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE a = d) >= 0.1")
+ }
+
+ @Test
+ def testSupportedConversionWithCorrelation5(): Unit = {
+ util.verifyPlan("SELECT * FROM x WHERE 0 < (SELECT COUNT(*) FROM y WHERE a = d)")
+ }
+
+ @Test
+ def testSupportedConversionWithCorrelation6(): Unit = {
+ util.verifyPlan("SELECT * FROM x WHERE 0.99 < (SELECT COUNT(*) FROM y WHERE a = d)")
+ }
+
+ @Test
+ def testSupportedConversionWithCorrelation7(): Unit = {
+ util.verifyPlan("SELECT * FROM x WHERE 1 <= (SELECT COUNT(*) FROM y WHERE a = d)")
+ }
+
+ @Test
+ def testSupportedConversionWithCorrelation8(): Unit = {
+ util.verifyPlan("SELECT * FROM x WHERE 0.01 <= (SELECT COUNT(*) FROM y WHERE a = d)")
+ }
+
+ @Test
+ def testSqlFromTpcDsQ41(): Unit = {
+ util.addTableSource[(Int, String, String, String, String, String, String)]("item",
+ 'i_manufact_id, 'i_manufact, 'i_product_name, 'i_category, 'i_color, 'i_units, 'i_size)
+ val sqlQuery =
+ """
+ |SELECT DISTINCT (i_product_name)
+ |FROM item i1
+ |WHERE i_manufact_id BETWEEN 738 AND 738 + 40
+ | AND (SELECT count(*) AS item_cnt
+ |FROM item
+ |WHERE (i_manufact = i1.i_manufact AND
+ | ((i_category = 'Women' AND
+ | (i_color = 'powder' OR i_color = 'khaki') AND
+ | (i_units = 'Ounce' OR i_units = 'Oz') AND
+ | (i_size = 'medium' OR i_size = 'extra large')
+ | ) OR
+ | (i_category = 'Women' AND
+ | (i_color = 'brown' OR i_color = 'honeydew') AND
+ | (i_units = 'Bunch' OR i_units = 'Ton') AND
+ | (i_size = 'N/A' OR i_size = 'small')
+ | ) OR
+ | (i_category = 'Men' AND
+ | (i_color = 'floral' OR i_color = 'deep') AND
+ | (i_units = 'N/A' OR i_units = 'Dozen') AND
+ | (i_size = 'petite' OR i_size = 'large')
+ | ) OR
+ | (i_category = 'Men' AND
+ | (i_color = 'light' OR i_color = 'cornflower') AND
+ | (i_units = 'Box' OR i_units = 'Pound') AND
+ | (i_size = 'medium' OR i_size = 'extra large')
+ | ))) OR
+ | (i_manufact = i1.i_manufact AND
+ | ((i_category = 'Women' AND
+ | (i_color = 'midnight' OR i_color = 'snow') AND
+ | (i_units = 'Pallet' OR i_units = 'Gross') AND
+ | (i_size = 'medium' OR i_size = 'extra large')
+ | ) OR
+ | (i_category = 'Women' AND
+ | (i_color = 'cyan' OR i_color = 'papaya') AND
+ | (i_units = 'Cup' OR i_units = 'Dram') AND
+ | (i_size = 'N/A' OR i_size = 'small')
+ | ) OR
+ | (i_category = 'Men' AND
+ | (i_color = 'orange' OR i_color = 'frosted') AND
+ | (i_units = 'Each' OR i_units = 'Tbl') AND
+ | (i_size = 'petite' OR i_size = 'large')
+ | ) OR
+ | (i_category = 'Men' AND
+ | (i_color = 'forest' OR i_color = 'ghost') AND
+ | (i_units = 'Lb' OR i_units = 'Bundle') AND
+ | (i_size = 'medium' OR i_size = 'extra large')
+ | )))) > 0
+ |ORDER BY i_product_name
+ |LIMIT 100
+ """.stripMargin
+ util.verifyPlan(sqlQuery)
+ }
+
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/LimitTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/LimitTest.scala
index fe440f4..b98d7b2 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/LimitTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/LimitTest.scala
@@ -28,7 +28,6 @@ class LimitTest extends TableTestBase {
private val util = streamTestUtil()
util.addDataStream[(Int, String, Long)]("MyTable", 'a, 'b, 'c, 'proctime, 'rowtime)
- // TODO optimize `limit 0`
@Test
def testLimitWithoutOffset(): Unit = {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/SetOperatorsTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/SetOperatorsTest.scala
new file mode 100644
index 0000000..85f4d40
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/SetOperatorsTest.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.stream.sql
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.GenericTypeInfo
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.{TableException, ValidationException}
+import org.apache.flink.table.plan.util.NonPojo
+import org.apache.flink.table.util.TableTestBase
+
+import org.junit.{Before, Test}
+
+class SetOperatorsTest extends TableTestBase {
+
+ private val util = streamTestUtil()
+
+ @Before
+ def before(): Unit = {
+ util.addTableSource[(Int, Long, String)]("T1", 'a, 'b, 'c)
+ util.addTableSource[(Int, Long, String)]("T2", 'd, 'e, 'f)
+ util.addTableSource[(Int, Long, Int, String, Long)]("T3", 'a, 'b, 'd, 'c, 'e)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testUnionDifferentColumnSize(): Unit = {
+ // must fail. Union inputs have different column size.
+ util.verifyPlan("SELECT * FROM T1 UNION ALL SELECT * FROM T3")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testUnionDifferentFieldTypes(): Unit = {
+ // must fail. Union inputs have different field types.
+ util.verifyPlan("SELECT a, b, c FROM T1 UNION ALL SELECT d, c, e FROM T3")
+ }
+
+ @Test(expected = classOf[TableException])
+ def testIntersectAll(): Unit = {
+ util.verifyPlan("SELECT c FROM T1 INTERSECT ALL SELECT f FROM T2")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testIntersectDifferentFieldTypes(): Unit = {
+ // must fail. Intersect inputs have different field types.
+ util.verifyPlan("SELECT a, b, c FROM T1 INTERSECT SELECT d, c, e FROM T3")
+ }
+
+ @Test(expected = classOf[TableException])
+ def testMinusAll(): Unit = {
+ util.verifyPlan("SELECT c FROM T1 EXCEPT ALL SELECT f FROM T2")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testMinusDifferentFieldTypes(): Unit = {
+ // must fail. Minus inputs have different field types.
+ util.verifyPlan("SELECT a, b, c FROM T1 EXCEPT SELECT d, c, e FROM T3")
+ }
+
+ @Test
+ def testIntersect(): Unit = {
+ util.verifyPlan("SELECT c FROM T1 INTERSECT SELECT f FROM T2")
+ }
+
+ @Test
+ def testIntersectLeftIsEmpty(): Unit = {
+ util.verifyPlan("SELECT c FROM T1 WHERE 1=0 INTERSECT SELECT f FROM T2")
+ }
+
+ @Test
+ def testIntersectRightIsEmpty(): Unit = {
+ util.verifyPlan("SELECT c FROM T1 INTERSECT SELECT f FROM T2 WHERE 1=0")
+ }
+
+ @Test
+ def testMinus(): Unit = {
+ util.verifyPlan("SELECT c FROM T1 EXCEPT SELECT f FROM T2")
+ }
+
+ @Test
+ def testMinusLeftIsEmpty(): Unit = {
+ util.verifyPlan("SELECT c FROM T1 WHERE 1=0 EXCEPT SELECT f FROM T2")
+ }
+
+ @Test
+ def testMinusRightIsEmpty(): Unit = {
+ util.verifyPlan("SELECT c FROM T1 EXCEPT SELECT f FROM T2 WHERE 1=0")
+ }
+
+ @Test
+ def testMinusWithNestedTypes(): Unit = {
+ util.addTableSource[(Long, (Int, String), Array[Boolean])]("MyTable", 'a, 'b, 'c)
+ util.verifyPlan("SELECT * FROM MyTable EXCEPT SELECT * FROM MyTable")
+ }
+
+ @Test
+ def testUnionNullableTypes(): Unit = {
+ util.addTableSource[((Int, String), (Int, String), Int)]("A", 'a, 'b, 'c)
+ util.verifyPlan("SELECT a FROM A UNION ALL SELECT CASE WHEN c > 0 THEN b ELSE NULL END FROM A")
+ }
+
+ @Test
+ def testUnionAnyType(): Unit = {
+ val util = batchTestUtil()
+ util.addTableSource("A",
+ Array[TypeInformation[_]](
+ new GenericTypeInfo(classOf[NonPojo]),
+ new GenericTypeInfo(classOf[NonPojo])),
+ Array("a", "b"))
+ util.verifyPlan("SELECT a FROM A UNION ALL SELECT b FROM A")
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/SortLimitTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/SortLimitTest.scala
index 1633507..285fc46 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/SortLimitTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/SortLimitTest.scala
@@ -27,7 +27,6 @@ class SortLimitTest extends TableTestBase {
private val util = streamTestUtil()
util.addDataStream[(Int, String, Long)]("MyTable", 'a, 'b, 'c, 'proctime, 'rowtime)
- // TODO optimize `limit 0`
@Test
def testSortProcessingTimeAscWithOffSet0AndLimit1(): Unit = {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/SubplanReuseTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/SubplanReuseTest.scala
index a4db284..7613792 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/SubplanReuseTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/SubplanReuseTest.scala
@@ -284,8 +284,7 @@ class SubplanReuseTest extends TableTestBase {
util.verifyPlan(sqlQuery)
}
- @Test(expected = classOf[TableException])
- // INTERSECT is not supported now
+ @Test
def testSubplanReuseWithDynamicFunction(): Unit = {
val sqlQuery = util.tableEnv.sqlQuery(
"""
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/pojos.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/pojos.scala
index 9704b45..5d2e62e 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/pojos.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/pojos.scala
@@ -39,3 +39,13 @@ class MyPojo() {
override def toString = s"MyPojo($f1, $f2)"
}
+
+class NonPojo {
+ val x = new java.util.HashMap[String, String]()
+
+ override def toString: String = x.toString
+
+ override def hashCode(): Int = super.hashCode()
+
+ override def equals(obj: scala.Any): Boolean = super.equals(obj)
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/Limit0RemoveITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/Limit0RemoveITCase.scala
new file mode 100644
index 0000000..def6c29
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/Limit0RemoveITCase.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.sql
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.runtime.utils.BatchTestBase
+import org.apache.flink.table.runtime.utils.BatchTestBase.row
+import org.apache.flink.table.runtime.utils.TestData.numericType
+
+import org.junit.{Before, Test}
+
+import java.math.{BigDecimal => JBigDecimal}
+
+import scala.collection.Seq
+
+
+class Limit0RemoveITCase extends BatchTestBase {
+
+ @Before
+ def before(): Unit = {
+ lazy val numericData = Seq(
+ row(null, 1L, 1.0f, 1.0d, JBigDecimal.valueOf(1)),
+ row(2, null, 2.0f, 2.0d, JBigDecimal.valueOf(2)),
+ row(3, 3L, null, 3.0d, JBigDecimal.valueOf(3)),
+ row(3, 3L, 4.0f, null, JBigDecimal.valueOf(3))
+ )
+
+ registerCollection("t1", numericData, numericType, "a, b, c, d, e")
+ registerCollection("t2", numericData, numericType, "a, b, c, d, e")
+ }
+
+ @Test
+ def testSimpleLimitRemove(): Unit = {
+ val sqlQuery = "SELECT * FROM t1 LIMIT 0"
+ checkResult(sqlQuery, Seq())
+ }
+
+ @Test
+ def testLimitRemoveWithOrderBy(): Unit = {
+ val sqlQuery = "SELECT * FROM t1 ORDER BY a LIMIT 0"
+ checkResult(sqlQuery, Seq())
+ }
+
+ @Test
+ def testLimitRemoveWithJoin(): Unit = {
+ val sqlQuery = "SELECT * FROM t1 JOIN (SELECT * FROM t2 LIMIT 0) ON true"
+ checkResult(sqlQuery, Seq())
+ }
+
+ @Test
+ def testLimitRemoveWithIn(): Unit = {
+ val sqlQuery = "SELECT * FROM t1 WHERE a IN (SELECT a FROM t2 LIMIT 0)"
+ checkResult(sqlQuery, Seq())
+ }
+
+ @Test
+ def testLimitRemoveWithNotIn(): Unit = {
+ val sqlQuery = "SELECT a FROM t1 WHERE a NOT IN (SELECT a FROM t2 LIMIT 0)"
+ checkResult(sqlQuery, Seq(row(2), row(3), row(3), row(null)))
+ }
+
+ @Test(expected = classOf[TableException])
+ // TODO remove exception after translateToPlanInternal is implemented in BatchExecNestedLoopJoin
+ def testLimitRemoveWithExists(): Unit = {
+ val sqlQuery = "SELECT * FROM t1 WHERE EXISTS (SELECT a FROM t2 LIMIT 0)"
+ checkResult(sqlQuery, Seq())
+ }
+
+ @Test(expected = classOf[TableException])
+ // TODO remove exception after translateToPlanInternal is implemented in BatchExecNestedLoopJoin
+ def testLimitRemoveWithNotExists(): Unit = {
+ val sqlQuery = "SELECT * FROM t1 WHERE NOT EXISTS (SELECT a FROM t2 LIMIT 0)"
+ checkResult(sqlQuery, Seq(row(2), row(3), row(3), row(null)))
+ }
+
+ @Test
+ def testLimitRemoveWithSelect(): Unit = {
+ val sqlQuery = "SELECT * FROM (SELECT a FROM t2 LIMIT 0)"
+ checkResult(sqlQuery, Seq())
+ }
+
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/Limit0RemoveITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/Limit0RemoveITCase.scala
new file mode 100644
index 0000000..6a402d5
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/Limit0RemoveITCase.scala
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.{TableConfigOptions, TableException}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.{StreamingTestBase, TestSinkUtil, TestingAppendTableSink, TestingUpsertTableSink}
+
+import org.junit.Assert.assertEquals
+import org.junit.{Before, Test}
+
+class Limit0RemoveITCase extends StreamingTestBase() {
+
+ @Before
+ def setup(): Unit = {
+ tEnv.getConfig.getConf.setBoolean(TableConfigOptions.SQL_EXEC_SOURCE_VALUES_INPUT_ENABLED, true)
+ }
+
+ @Test
+ def testSimpleLimitRemove(): Unit = {
+ val ds = env.fromCollection(Seq(1, 2, 3, 4, 5, 6))
+ val table = ds.toTable(tEnv, 'a)
+ tEnv.registerTable("MyTable", table)
+
+ val sql = "SELECT * FROM MyTable LIMIT 0"
+
+ val result = tEnv.sqlQuery(sql)
+ val sink = TestSinkUtil.configureSink(result, new TestingAppendTableSink())
+ tEnv.writeToSink(result, sink)
+ tEnv.execute()
+
+ assertEquals(0, sink.getAppendResults.size)
+ }
+
+ @Test
+ def testLimitRemoveWithOrderBy(): Unit = {
+ val ds = env.fromCollection(Seq(1, 2, 3, 4, 5, 6))
+ val table = ds.toTable(tEnv, 'a)
+ tEnv.registerTable("MyTable", table)
+
+ val sql = "SELECT * FROM MyTable ORDER BY a LIMIT 0"
+
+ val result = tEnv.sqlQuery(sql)
+ val sink = TestSinkUtil.configureSink(result, new TestingAppendTableSink())
+ tEnv.writeToSink(result, sink)
+ tEnv.execute()
+
+ assertEquals(0, sink.getAppendResults.size)
+ }
+
+ @Test
+ def testLimitRemoveWithSelect(): Unit = {
+ val ds = env.fromCollection(Seq(1, 2, 3, 4, 5, 6))
+ val table = ds.toTable(tEnv, 'a)
+ tEnv.registerTable("MyTable", table)
+
+ val sql = "select a2 from (select cast(a as int) a2 from MyTable limit 0)"
+
+ val result = tEnv.sqlQuery(sql)
+ val sink = TestSinkUtil.configureSink(result, new TestingAppendTableSink())
+ tEnv.writeToSink(result, sink)
+ tEnv.execute()
+
+ assertEquals(0, sink.getAppendResults.size)
+ }
+
+ @Test
+ def testLimitRemoveWithIn(): Unit = {
+ val ds1 = env.fromCollection(Seq(1, 2, 3, 4, 5, 6))
+ val table1 = ds1.toTable(tEnv, 'a)
+ tEnv.registerTable("MyTable1", table1)
+
+ val ds2 = env.fromCollection(Seq(1, 2, 3))
+ val table2 = ds1.toTable(tEnv, 'a)
+ tEnv.registerTable("MyTable2", table2)
+
+ val sql = "SELECT * FROM MyTable1 WHERE a IN (SELECT a FROM MyTable2 LIMIT 0)"
+
+ val result = tEnv.sqlQuery(sql)
+ val sink = TestSinkUtil.configureSink(result, new TestingAppendTableSink())
+ tEnv.writeToSink(result, sink)
+ tEnv.execute()
+
+ assertEquals(0, sink.getAppendResults.size)
+ }
+
+ @Test
+ def testLimitRemoveWithNotIn(): Unit = {
+ val ds1 = env.fromCollection(Seq(1, 2, 3, 4, 5, 6))
+ val table1 = ds1.toTable(tEnv, 'a)
+ tEnv.registerTable("MyTable1", table1)
+
+ val ds2 = env.fromCollection(Seq(1, 2, 3))
+ val table2 = ds1.toTable(tEnv, 'a)
+ tEnv.registerTable("MyTable2", table2)
+
+ val sql = "SELECT * FROM MyTable1 WHERE a NOT IN (SELECT a FROM MyTable2 LIMIT 0)"
+
+ val result = tEnv.sqlQuery(sql)
+ val sink = TestSinkUtil.configureSink(result, new TestingAppendTableSink())
+ tEnv.writeToSink(result, sink)
+ tEnv.execute()
+
+ val expected = Seq("1", "2", "3", "4", "5", "6")
+ assertEquals(expected, sink.getAppendResults.sorted)
+ }
+
+ @Test(expected = classOf[TableException])
+ // TODO remove exception after translateToPlanInternal is implemented in StreamExecJoin
+ def testLimitRemoveWithExists(): Unit = {
+ val ds1 = env.fromCollection(Seq(1, 2, 3, 4, 5, 6))
+ val table1 = ds1.toTable(tEnv, 'a)
+ tEnv.registerTable("MyTable1", table1)
+
+ val ds2 = env.fromCollection(Seq(1, 2, 3))
+ val table2 = ds1.toTable(tEnv, 'a)
+ tEnv.registerTable("MyTable2", table2)
+
+ val sql = "SELECT * FROM MyTable1 WHERE EXISTS (SELECT a FROM MyTable2 LIMIT 0)"
+
+ val result = tEnv.sqlQuery(sql)
+ val sink = TestSinkUtil.configureSink(result, new TestingUpsertTableSink(Array(0)))
+ tEnv.writeToSink(result, sink)
+ tEnv.execute()
+
+ assertEquals(0, sink.getRawResults.size)
+ }
+
+ @Test(expected = classOf[TableException])
+ // TODO remove exception after translateToPlanInternal is implemented in StreamExecJoin
+ def testLimitRemoveWithNotExists(): Unit = {
+ val ds1 = env.fromCollection(Seq(1, 2, 3, 4, 5, 6))
+ val table1 = ds1.toTable(tEnv, 'a)
+ tEnv.registerTable("MyTable1", table1)
+
+ val ds2 = env.fromCollection(Seq(1, 2, 3))
+ val table2 = ds1.toTable(tEnv, 'a)
+ tEnv.registerTable("MyTable2", table2)
+
+ val sql = "SELECT * FROM MyTable1 WHERE NOT EXISTS (SELECT a FROM MyTable2 LIMIT 0)"
+
+ val result = tEnv.sqlQuery(sql)
+ val sink = TestSinkUtil.configureSink(result, new TestingUpsertTableSink(Array(0)))
+ tEnv.writeToSink(result, sink)
+ tEnv.execute()
+
+ val expected = Seq("1", "2", "3", "4", "5", "6")
+ assertEquals(expected, sink.getUpsertResults.sorted)
+ }
+
+ @Test
+ def testLimitRemoveWithJoin(): Unit = {
+ val ds1 = env.fromCollection(Seq(1, 2, 3, 4, 5, 6))
+ val table1 = ds1.toTable(tEnv, 'a1)
+ tEnv.registerTable("MyTable1", table1)
+
+ val ds2 = env.fromCollection(Seq(1, 2, 3))
+ val table2 = ds1.toTable(tEnv, 'a2)
+ tEnv.registerTable("MyTable2", table2)
+
+ val sql = "SELECT a1 FROM MyTable1 INNER JOIN (SELECT a2 FROM MyTable2 LIMIT 0) ON true"
+
+ val result = tEnv.sqlQuery(sql)
+ val sink = TestSinkUtil.configureSink(result, new TestingAppendTableSink())
+ tEnv.writeToSink(result, sink)
+ tEnv.execute()
+
+ assertEquals(0, sink.getAppendResults.size)
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestSink.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestSink.scala
index 1839ba6..4f71696 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestSink.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestSink.scala
@@ -249,7 +249,7 @@ final class TestingUpsertSink(keys: Array[Int], tz: TimeZone)
}
}
-final class TestingUpsertTableSink(keys: Array[Int], tz: TimeZone)
+final class TestingUpsertTableSink(val keys: Array[Int], val tz: TimeZone)
extends UpsertStreamTableSink[BaseRow] {
var fNames: Array[String] = _
var fTypes: Array[TypeInformation[_]] = _
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/TestSinkUtil.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/TestSinkUtil.scala
index 958c264..b1affec 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/TestSinkUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/TestSinkUtil.scala
@@ -19,7 +19,7 @@
package org.apache.flink.table.runtime.utils
import org.apache.flink.table.`type`.TypeConverters.createExternalTypeInfoFromInternalType
-import org.apache.flink.table.api.{Table, TableImpl}
+import org.apache.flink.table.api.{Table, TableException, TableImpl}
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.dataformat.GenericRow
import org.apache.flink.table.runtime.utils.JavaPojos.Pojo1
@@ -40,9 +40,17 @@ object TestSinkUtil {
val rowType = table.asInstanceOf[TableImpl].getRelNode.getRowType
val fieldNames = rowType.getFieldNames.asScala.toArray
val fieldTypes = rowType.getFieldList.asScala
- .map(field => FlinkTypeFactory.toInternalType(field.getType))
- .map(createExternalTypeInfoFromInternalType).toArray
- new TestingAppendTableSink().configure(fieldNames, fieldTypes).asInstanceOf[T]
+ .map(field => FlinkTypeFactory.toInternalType(field.getType))
+ .map(createExternalTypeInfoFromInternalType).toArray
+ sink match {
+ case _: TestingAppendTableSink =>
+ new TestingAppendTableSink().configure(fieldNames, fieldTypes).asInstanceOf[T]
+ case s: TestingUpsertTableSink =>
+ new TestingUpsertTableSink(s.keys, s.tz).configure(fieldNames, fieldTypes).asInstanceOf[T]
+ case _: TestingRetractTableSink =>
+ new TestingRetractTableSink().configure(fieldNames, fieldTypes).asInstanceOf[T]
+ case _ => throw new TableException(s"Unsupported sink: $sink")
+ }
}
def fieldToString(field: Any, tz: TimeZone): String = {