You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/05/28 09:21:48 UTC

[flink] branch master updated: [FLINK-12600][table-planner-blink] Introduce various deterministic rewriting rule, which includes:

This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b333ddc  [FLINK-12600][table-planner-blink] Introduce various deterministic rewriting rule, which includes:
b333ddc is described below

commit b333ddc16967ae7428229d659130b61de906ef5f
Author: godfrey he <go...@163.com>
AuthorDate: Tue May 28 17:21:34 2019 +0800

    [FLINK-12600][table-planner-blink] Introduce various deterministic rewriting rule, which includes:
    
    1. FlinkLimit0RemoveRule, that rewrites `limit 0` to empty Values
    
    2. FlinkRewriteSubQueryRule, that rewrites a Filter with condition: `(select count from T) > 0` to a Filter with condition: `exists(select * from T)`
    
    3. ReplaceIntersectWithSemiJoinRule, that rewrites distinct Intersect to a distinct Aggregate on a SEMI Join
    
    4. ReplaceMinusWithAntiJoinRule, that rewrites distinct Minus to a distinct Aggregate on an ANTI Join
    
    This closes #8520
---
 .../table/plan/rules/FlinkBatchRuleSets.scala      |  13 +-
 .../table/plan/rules/FlinkStreamRuleSets.scala     |  13 +-
 .../plan/rules/logical/FlinkCalcMergeRule.scala    |   2 +-
 .../plan/rules/logical/FlinkLimit0RemoveRule.scala |  50 ++
 .../plan/rules/logical/FlinkPruneEmptyRules.scala  |  70 +++
 .../rules/logical/FlinkRewriteSubQueryRule.scala   | 168 ++++++
 .../logical/ReplaceIntersectWithSemiJoinRule.scala |  61 ++
 .../logical/ReplaceMinusWithAntiJoinRule.scala     |  61 ++
 .../logical/ReplaceSetOpWithJoinRuleBase.scala     |  58 ++
 .../flink/table/plan/batch/sql/LimitTest.xml       |  23 +-
 .../table/plan/batch/sql/SetOperatorsTest.xml      | 224 ++++++++
 .../flink/table/plan/batch/sql/SortLimitTest.xml   |  20 +-
 .../table/plan/batch/sql/SubplanReuseTest.xml      |  39 ++
 .../rules/logical/FlinkLimit0RemoveRuleTest.xml    | 214 +++++++
 .../rules/logical/FlinkPruneEmptyRulesTest.xml     |  63 +++
 .../ReplaceIntersectWithSemiJoinRuleTest.xml       | 123 +++++
 .../logical/ReplaceMinusWithAntiJoinRuleTest.xml   | 123 +++++
 .../subquery/FlinkRewriteSubQueryRuleTest.xml      | 612 +++++++++++++++++++++
 .../flink/table/plan/stream/sql/LimitTest.xml      |  60 +-
 .../table/plan/stream/sql/SetOperatorsTest.xml     | 226 ++++++++
 .../flink/table/plan/stream/sql/SortLimitTest.xml  |  80 +--
 .../table/plan/stream/sql/SubplanReuseTest.xml     |  37 ++
 .../flink/table/plan/batch/sql/LimitTest.scala     |   1 -
 .../table/plan/batch/sql/SetOperatorsTest.scala    | 129 +++++
 .../flink/table/plan/batch/sql/SortLimitTest.scala |   1 -
 .../table/plan/batch/sql/SubplanReuseTest.scala    |   3 +-
 .../rules/logical/FlinkLimit0RemoveRuleTest.scala  | 101 ++++
 .../rules/logical/FlinkPruneEmptyRulesTest.scala   |  73 +++
 .../ReplaceIntersectWithSemiJoinRuleTest.scala     |  84 +++
 .../logical/ReplaceMinusWithAntiJoinRuleTest.scala |  82 +++
 .../subquery/FlinkRewriteSubQueryRuleTest.scala    | 211 +++++++
 .../flink/table/plan/stream/sql/LimitTest.scala    |   1 -
 .../table/plan/stream/sql/SetOperatorsTest.scala   | 127 +++++
 .../table/plan/stream/sql/SortLimitTest.scala      |   1 -
 .../table/plan/stream/sql/SubplanReuseTest.scala   |   3 +-
 .../org/apache/flink/table/plan/util/pojos.scala   |  10 +
 .../runtime/batch/sql/Limit0RemoveITCase.scala     |  98 ++++
 .../runtime/stream/sql/Limit0RemoveITCase.scala    | 187 +++++++
 .../flink/table/runtime/utils/StreamTestSink.scala |   2 +-
 .../flink/table/runtime/utils/TestSinkUtil.scala   |  16 +-
 40 files changed, 3303 insertions(+), 167 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
index d7e45b3..1401605 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
@@ -34,6 +34,7 @@ object FlinkBatchRuleSets {
 
   val SEMI_JOIN_RULES: RuleSet = RuleSets.ofList(
     SimplifyFilterConditionRule.EXTENDED,
+    FlinkRewriteSubQueryRule.FILTER,
     FlinkSubQueryRemoveRule.FILTER,
     JoinConditionTypeCoerceRule.INSTANCE,
     FlinkJoinPushExpressionsRule.INSTANCE
@@ -116,7 +117,9 @@ object FlinkBatchRuleSets {
         new CoerceInputsRule(classOf[LogicalIntersect], false),
         //ensure except set operator have the same row type
         new CoerceInputsRule(classOf[LogicalMinus], false),
-        ConvertToNotInOrInRule.INSTANCE
+        ConvertToNotInOrInRule.INSTANCE,
+        // optimize limit 0
+        FlinkLimit0RemoveRule.INSTANCE
       )).asJava)
 
   /**
@@ -159,7 +162,7 @@ object FlinkBatchRuleSets {
     PruneEmptyRules.AGGREGATE_INSTANCE,
     PruneEmptyRules.FILTER_INSTANCE,
     PruneEmptyRules.JOIN_LEFT_INSTANCE,
-    PruneEmptyRules.JOIN_RIGHT_INSTANCE,
+    FlinkPruneEmptyRules.JOIN_RIGHT_INSTANCE,
     PruneEmptyRules.PROJECT_INSTANCE,
     PruneEmptyRules.SORT_INSTANCE,
     PruneEmptyRules.UNION_INSTANCE
@@ -260,7 +263,11 @@ object FlinkBatchRuleSets {
     // semi/anti join transpose rule
     FlinkSemiAntiJoinJoinTransposeRule.INSTANCE,
     FlinkSemiAntiJoinProjectTransposeRule.INSTANCE,
-    FlinkSemiAntiJoinFilterTransposeRule.INSTANCE
+    FlinkSemiAntiJoinFilterTransposeRule.INSTANCE,
+
+    // set operators
+    ReplaceIntersectWithSemiJoinRule.INSTANCE,
+    ReplaceMinusWithAntiJoinRule.INSTANCE
   )
 
   /**
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
index ce99a98..419306e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
@@ -34,6 +34,7 @@ object FlinkStreamRuleSets {
 
   val SEMI_JOIN_RULES: RuleSet = RuleSets.ofList(
     SimplifyFilterConditionRule.EXTENDED,
+    FlinkRewriteSubQueryRule.FILTER,
     FlinkSubQueryRemoveRule.FILTER,
     JoinConditionTypeCoerceRule.INSTANCE,
     FlinkJoinPushExpressionsRule.INSTANCE
@@ -118,7 +119,9 @@ object FlinkStreamRuleSets {
         new CoerceInputsRule(classOf[LogicalIntersect], false),
         //ensure except set operator have the same row type
         new CoerceInputsRule(classOf[LogicalMinus], false),
-        ConvertToNotInOrInRule.INSTANCE
+        ConvertToNotInOrInRule.INSTANCE,
+        // optimize limit 0
+        FlinkLimit0RemoveRule.INSTANCE
       )
     ).asJava)
 
@@ -157,7 +160,7 @@ object FlinkStreamRuleSets {
     PruneEmptyRules.AGGREGATE_INSTANCE,
     PruneEmptyRules.FILTER_INSTANCE,
     PruneEmptyRules.JOIN_LEFT_INSTANCE,
-    PruneEmptyRules.JOIN_RIGHT_INSTANCE,
+    FlinkPruneEmptyRules.JOIN_RIGHT_INSTANCE,
     PruneEmptyRules.PROJECT_INSTANCE,
     PruneEmptyRules.SORT_INSTANCE,
     PruneEmptyRules.UNION_INSTANCE
@@ -232,7 +235,11 @@ object FlinkStreamRuleSets {
     // semi/anti join transpose rule
     FlinkSemiAntiJoinJoinTransposeRule.INSTANCE,
     FlinkSemiAntiJoinProjectTransposeRule.INSTANCE,
-    FlinkSemiAntiJoinFilterTransposeRule.INSTANCE
+    FlinkSemiAntiJoinFilterTransposeRule.INSTANCE,
+
+    // set operators
+    ReplaceIntersectWithSemiJoinRule.INSTANCE,
+    ReplaceMinusWithAntiJoinRule.INSTANCE
   )
 
   /**
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/FlinkCalcMergeRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/FlinkCalcMergeRule.scala
index 8deddfb..550c318 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/FlinkCalcMergeRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/FlinkCalcMergeRule.scala
@@ -29,7 +29,7 @@ import org.apache.calcite.tools.RelBuilderFactory
 import scala.collection.JavaConversions._
 
 /**
-  * This rules is copied from Calcite's [[org.apache.calcite.rel.rules.CalcMergeRule]].
+  * This rule is copied from Calcite's [[org.apache.calcite.rel.rules.CalcMergeRule]].
   *
   * Modification:
   * - Condition in the merged program will be simplified if it exists.
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/FlinkLimit0RemoveRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/FlinkLimit0RemoveRule.scala
new file mode 100644
index 0000000..73190fa
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/FlinkLimit0RemoveRule.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.core.Sort
+import org.apache.calcite.rex.RexLiteral
+
+/**
+  * Planner rule that rewrites `limit 0` to empty [[org.apache.calcite.rel.core.Values]].
+  */
+class FlinkLimit0RemoveRule extends RelOptRule(
+  operand(classOf[Sort], any()),
+  "FlinkLimit0RemoveRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val sort: Sort = call.rel(0)
+    sort.fetch != null && RexLiteral.intValue(sort.fetch) == 0
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+    val sort: Sort = call.rel(0)
+    val emptyValues = call.builder().values(sort.getRowType).build()
+    call.transformTo(emptyValues)
+
+    // New plan is absolutely better than old plan.
+    call.getPlanner.setImportance(sort, 0.0)
+  }
+}
+
+object FlinkLimit0RemoveRule {
+  val INSTANCE = new FlinkLimit0RemoveRule
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/FlinkPruneEmptyRules.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/FlinkPruneEmptyRules.scala
new file mode 100644
index 0000000..f85087b
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/FlinkPruneEmptyRules.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{any, none, operand, some}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.{Join, JoinRelType, Values}
+
+object FlinkPruneEmptyRules {
+
+  /**
+    * This rule is copied from Calcite's
+    * [[org.apache.calcite.rel.rules.PruneEmptyRules#JOIN_RIGHT_INSTANCE]].
+    *
+    * Modification:
+    * - Handles ANTI join specially.
+    *
+    * Rule that converts a [[Join]] to empty if its right child is empty.
+    *
+    * <p>Examples:
+    *
+    * <ul>
+    * <li>Join(Scan(Emp), Empty, INNER) becomes Empty
+    * </ul>
+    */
+  val JOIN_RIGHT_INSTANCE: RelOptRule = new RelOptRule(
+    operand(classOf[Join],
+      some(operand(classOf[RelNode], any),
+        operand(classOf[Values], none))),
+    "FlinkPruneEmptyRules(right)") {
+
+    override def matches(call: RelOptRuleCall): Boolean = {
+      val right: Values = call.rel(2)
+      Values.IS_EMPTY.apply(right)
+    }
+
+    override def onMatch(call: RelOptRuleCall): Unit = {
+      val join: Join = call.rel(0)
+      join.getJoinType match {
+        case JoinRelType.ANTI =>
+          // "select * from emp where deptno not in (select deptno from dept where 1=0)"
+          // return emp
+          call.transformTo(call.builder().push(join.getLeft).build)
+        case _ =>
+          if (join.getJoinType.generatesNullsOnRight) {
+            // "select * from emp left join dept" is not necessarily empty if dept is empty
+          } else {
+            call.transformTo(call.builder.push(join).empty.build)
+          }
+      }
+    }
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/FlinkRewriteSubQueryRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/FlinkRewriteSubQueryRule.scala
new file mode 100644
index 0000000..8d97e34
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/FlinkRewriteSubQueryRule.scala
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{any, operandJ}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.{Aggregate, Filter, RelFactories}
+import org.apache.calcite.rex.{RexShuttle, _}
+import org.apache.calcite.sql.SqlKind
+import org.apache.calcite.sql.`type`.SqlTypeFamily
+import org.apache.calcite.sql.fun.SqlCountAggFunction
+import org.apache.calcite.tools.RelBuilderFactory
+
+import scala.collection.JavaConversions._
+
+/**
+  * Planner rule that rewrites scalar query in filter like:
+  * `select * from T1 where (select count(*) from T2) > 0`
+  * to
+  * `select * from T1 where exists (select * from T2)`,
+  * which could be converted to SEMI join by [[FlinkSubQueryRemoveRule]].
+  *
+  * Without this rule, the original query will be rewritten to a filter on a join on an aggregate
+  * by [[org.apache.calcite.rel.rules.SubQueryRemoveRule]]. the full logical plan is
+  * {{{
+  * LogicalProject(a=[$0], b=[$1], c=[$2])
+  * +- LogicalJoin(condition=[$3], joinType=[semi])
+  *    :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+  *    +- LogicalProject($f0=[IS NOT NULL($0)])
+  *       +- LogicalAggregate(group=[{}], m=[MIN($0)])
+  *          +- LogicalProject(i=[true])
+  *             +- LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+  * }}}
+  */
+class FlinkRewriteSubQueryRule(
+    operand: RelOptRuleOperand,
+    relBuilderFactory: RelBuilderFactory,
+    description: String)
+  extends RelOptRule(operand, relBuilderFactory, description) {
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+    val filter: Filter = call.rel(0)
+    val condition = filter.getCondition
+    val newCondition = rewriteScalarQuery(condition)
+    if (RexUtil.eq(condition, newCondition)) {
+      return
+    }
+
+    val newFilter = filter.copy(filter.getTraitSet, filter.getInput, newCondition)
+    call.transformTo(newFilter)
+  }
+
+  // scalar query like: `(select count(*) from T) > 0` can be converted to `exists(select * from T)`
+  def rewriteScalarQuery(condition: RexNode): RexNode = {
+    condition.accept(new RexShuttle() {
+      override def visitCall(call: RexCall): RexNode = {
+        val subQuery = getSupportedScalarQuery(call)
+        subQuery match {
+          case Some(sq) =>
+            val aggInput = sq.rel.getInput(0)
+            RexSubQuery.exists(aggInput)
+          case _ => super.visitCall(call)
+        }
+      }
+    })
+  }
+
+  private def isScalarQuery(n: RexNode): Boolean = n.isA(SqlKind.SCALAR_QUERY)
+
+  private def getSupportedScalarQuery(call: RexCall): Option[RexSubQuery] = {
+    // check the RexNode is a RexLiteral which's value is between 0 and 1
+    def isBetween0And1(n: RexNode, include0: Boolean, include1: Boolean): Boolean = {
+      n match {
+        case l: RexLiteral =>
+          l.getTypeName.getFamily match {
+            case SqlTypeFamily.NUMERIC if l.getValue != null =>
+              val v = l.getValue.toString.toDouble
+              (0.0 < v && v < 1.0) || (include0 && v == 0.0) || (include1 && v == 1.0)
+            case _ => false
+          }
+        case _ => false
+      }
+    }
+
+    // check the RelNode is a Aggregate which has only count aggregate call with empty args
+    def isCountStarAggWithoutGroupBy(n: RelNode): Boolean = {
+      n match {
+        case agg: Aggregate =>
+          if (agg.getGroupCount == 0 && agg.getAggCallList.size() == 1) {
+            val aggCall = agg.getAggCallList.head
+            !aggCall.isDistinct &&
+              aggCall.filterArg < 0 &&
+              aggCall.getArgList.isEmpty &&
+              aggCall.getAggregation.isInstanceOf[SqlCountAggFunction]
+          } else {
+            false
+          }
+        case _ => false
+      }
+    }
+
+    call.getKind match {
+      // (select count(*) from T) > X (X is between 0 (inclusive) and 1 (exclusive))
+      case SqlKind.GREATER_THAN if isScalarQuery(call.operands.head) =>
+        val subQuery = call.operands.head.asInstanceOf[RexSubQuery]
+        if (isCountStarAggWithoutGroupBy(subQuery.rel) &&
+          isBetween0And1(call.operands.last, include0 = true, include1 = false)) {
+          Some(subQuery)
+        } else {
+          None
+        }
+      // (select count(*) from T) >= X (X is between 0 (exclusive) and 1 (inclusive))
+      case SqlKind.GREATER_THAN_OR_EQUAL if isScalarQuery(call.operands.head) =>
+        val subQuery = call.operands.head.asInstanceOf[RexSubQuery]
+        if (isCountStarAggWithoutGroupBy(subQuery.rel) &&
+          isBetween0And1(call.operands.last, include0 = false, include1 = true)) {
+          Some(subQuery)
+        } else {
+          None
+        }
+      // X < (select count(*) from T) (X is between 0 (inclusive) and 1 (exclusive))
+      case SqlKind.LESS_THAN if isScalarQuery(call.operands.last) =>
+        val subQuery = call.operands.last.asInstanceOf[RexSubQuery]
+        if (isCountStarAggWithoutGroupBy(subQuery.rel) &&
+          isBetween0And1(call.operands.head, include0 = true, include1 = false)) {
+          Some(subQuery)
+        } else {
+          None
+        }
+      // X <= (select count(*) from T) (X is between 0 (exclusive) and 1 (inclusive))
+      case SqlKind.LESS_THAN_OR_EQUAL if isScalarQuery(call.operands.last) =>
+        val subQuery = call.operands.last.asInstanceOf[RexSubQuery]
+        if (isCountStarAggWithoutGroupBy(subQuery.rel) &&
+          isBetween0And1(call.operands.head, include0 = false, include1 = true)) {
+          Some(subQuery)
+        } else {
+          None
+        }
+      case _ => None
+    }
+  }
+}
+
+object FlinkRewriteSubQueryRule {
+
+  val FILTER = new FlinkRewriteSubQueryRule(
+    operandJ(classOf[Filter], null, RexUtil.SubQueryFinder.FILTER_PREDICATE, any),
+    RelFactories.LOGICAL_BUILDER,
+    "FlinkRewriteSubQueryRule:Filter")
+
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/ReplaceIntersectWithSemiJoinRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/ReplaceIntersectWithSemiJoinRule.scala
new file mode 100644
index 0000000..6ac7dea
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/ReplaceIntersectWithSemiJoinRule.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.core.{Aggregate, Intersect, Join, JoinRelType}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Planner rule that replaces distinct [[Intersect]] with
+  * a distinct [[Aggregate]] on a SEMI [[Join]].
+  *
+  * <p>Note: Not support Intersect All.
+  */
+class ReplaceIntersectWithSemiJoinRule extends ReplaceSetOpWithJoinRuleBase(
+  classOf[Intersect],
+  "ReplaceIntersectWithSemiJoinRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val intersect: Intersect = call.rel(0)
+    // not support intersect all now.
+    intersect.isDistinct
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+    val intersect: Intersect = call.rel(0)
+    val left = intersect.getInput(0)
+    val right = intersect.getInput(1)
+
+    val relBuilder = call.builder
+    val keys = 0 until left.getRowType.getFieldCount
+    val conditions = generateCondition(relBuilder, left, right, keys)
+
+    relBuilder.push(left)
+    relBuilder.push(right)
+    relBuilder.join(JoinRelType.SEMI, conditions).aggregate(relBuilder.groupKey(keys: _*))
+    val rel = relBuilder.build()
+    call.transformTo(rel)
+  }
+}
+
+object ReplaceIntersectWithSemiJoinRule {
+  val INSTANCE: RelOptRule = new ReplaceIntersectWithSemiJoinRule
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/ReplaceMinusWithAntiJoinRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/ReplaceMinusWithAntiJoinRule.scala
new file mode 100644
index 0000000..c322cf9
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/ReplaceMinusWithAntiJoinRule.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.core._
+
+import scala.collection.JavaConversions._
+
+/**
+  * Planner rule that replaces distinct [[Minus]] (SQL keyword: EXCEPT) with
+  * a distinct [[Aggregate]] on an ANTI [[Join]].
+  *
+  * <p>Note: Not support Minus All.
+  */
+class ReplaceMinusWithAntiJoinRule extends ReplaceSetOpWithJoinRuleBase(
+  classOf[Minus],
+  "ReplaceMinusWithAntiJoinRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val minus: Minus = call.rel(0)
+    // not support minus all now.
+    minus.isDistinct
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+    val minus: Minus = call.rel(0)
+    val left = minus.getInput(0)
+    val right = minus.getInput(1)
+
+    val relBuilder = call.builder
+    val keys = 0 until left.getRowType.getFieldCount
+    val conditions = generateCondition(relBuilder, left, right, keys)
+
+    relBuilder.push(left)
+    relBuilder.push(right)
+    relBuilder.join(JoinRelType.ANTI, conditions).aggregate(relBuilder.groupKey(keys: _*))
+    val rel = relBuilder.build()
+    call.transformTo(rel)
+  }
+}
+
+object ReplaceMinusWithAntiJoinRule {
+  val INSTANCE: RelOptRule = new ReplaceMinusWithAntiJoinRule
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/ReplaceSetOpWithJoinRuleBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/ReplaceSetOpWithJoinRuleBase.scala
new file mode 100644
index 0000000..1f400a0
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/ReplaceSetOpWithJoinRuleBase.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptUtil}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.{RelFactories, SetOp}
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
+
+/**
+  * Base class that replace [[SetOp]] to [[org.apache.calcite.rel.core.Join]].
+  */
+abstract class ReplaceSetOpWithJoinRuleBase[T <: SetOp](
+    clazz: Class[T],
+    description: String)
+  extends RelOptRule(
+    operand(clazz, any),
+    RelFactories.LOGICAL_BUILDER,
+    description) {
+
+  protected def generateCondition(
+      relBuilder: RelBuilder,
+      left: RelNode,
+      right: RelNode,
+      keys: Seq[Int]): Seq[RexNode] = {
+    val rexBuilder = relBuilder.getRexBuilder
+    val leftTypes = RelOptUtil.getFieldTypeList(left.getRowType)
+    val rightTypes = RelOptUtil.getFieldTypeList(right.getRowType)
+    val conditions = keys.map { key =>
+      val leftRex = rexBuilder.makeInputRef(leftTypes.get(key), key)
+      val rightRex = rexBuilder.makeInputRef(rightTypes.get(key), leftTypes.size + key)
+      val equalCond = rexBuilder.makeCall(SqlStdOperatorTable.EQUALS, leftRex, rightRex)
+      relBuilder.or(
+        equalCond,
+        relBuilder.and(relBuilder.isNull(leftRex), relBuilder.isNull(rightRex)))
+    }
+    conditions
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/LimitTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/LimitTest.xml
index 731f1b5..54f0292 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/LimitTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/LimitTest.xml
@@ -29,11 +29,7 @@ LogicalSort(fetch=[0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, c])
-+- Limit(offset=[0], fetch=[0], global=[true])
-   +- Exchange(distribution=[single])
-      +- Limit(offset=[0], fetch=[0], global=[false])
-         +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+Values(tuples=[[]], values=[a, c])
 ]]>
     </Resource>
   </TestCase>
@@ -92,11 +88,7 @@ LogicalSort(offset=[10], fetch=[0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, c])
-+- Limit(offset=[10], fetch=[0], global=[true])
-   +- Exchange(distribution=[single])
-      +- Limit(offset=[0], fetch=[10], global=[false])
-         +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+Values(tuples=[[]], values=[a, c])
 ]]>
     </Resource>
   </TestCase>
@@ -134,11 +126,7 @@ LogicalSort(offset=[0], fetch=[0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, c])
-+- Limit(offset=[0], fetch=[0], global=[true])
-   +- Exchange(distribution=[single])
-      +- Limit(offset=[0], fetch=[0], global=[false])
-         +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+Values(tuples=[[]], values=[a, c])
 ]]>
     </Resource>
   </TestCase>
@@ -155,10 +143,7 @@ LogicalSort(fetch=[0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Limit(offset=[0], fetch=[0], global=[true])
-+- Exchange(distribution=[single])
-   +- Limit(offset=[0], fetch=[0], global=[false])
-      +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+Values(tuples=[[]], values=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SetOperatorsTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SetOperatorsTest.xml
new file mode 100644
index 0000000..a3fada8
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SetOperatorsTest.xml
@@ -0,0 +1,224 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testIntersect">
+    <Resource name="sql">
+      <![CDATA[SELECT c FROM T1 INTERSECT SELECT f FROM T2]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalIntersect(all=[false])
+:- LogicalProject(c=[$2])
+:  +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(f=[$2])
+   +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+HashAggregate(isMerge=[false], groupBy=[c], select=[c])
++- HashJoin(joinType=[LeftSemiJoin], where=[OR(=(c, f), AND(IS NULL(c), IS NULL(f)))], select=[c], build=[left])
+   :- Exchange(distribution=[hash[c]])
+   :  +- Calc(select=[c])
+   :     +- TableSourceScan(table=[[T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+   +- Exchange(distribution=[hash[f]])
+      +- Calc(select=[f])
+         +- TableSourceScan(table=[[T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testIntersectLeftIsEmpty">
+    <Resource name="sql">
+      <![CDATA[SELECT c FROM T1 WHERE 1=0 INTERSECT SELECT f FROM T2]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalIntersect(all=[false])
+:- LogicalProject(c=[$2])
+:  +- LogicalFilter(condition=[=(1, 0)])
+:     +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(f=[$2])
+   +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Values(tuples=[[]], values=[c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testIntersectRightIsEmpty">
+    <Resource name="sql">
+      <![CDATA[SELECT c FROM T1 INTERSECT SELECT f FROM T2 WHERE 1=0]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalIntersect(all=[false])
+:- LogicalProject(c=[$2])
+:  +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(f=[$2])
+   +- LogicalFilter(condition=[=(1, 0)])
+      +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Values(tuples=[[]], values=[c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMinus">
+    <Resource name="sql">
+      <![CDATA[SELECT c FROM T1 EXCEPT SELECT f FROM T2]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalMinus(all=[false])
+:- LogicalProject(c=[$2])
+:  +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(f=[$2])
+   +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+HashAggregate(isMerge=[false], groupBy=[c], select=[c])
++- HashJoin(joinType=[LeftAntiJoin], where=[OR(=(c, f), AND(IS NULL(c), IS NULL(f)))], select=[c], build=[left])
+   :- Exchange(distribution=[hash[c]])
+   :  +- Calc(select=[c])
+   :     +- TableSourceScan(table=[[T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+   +- Exchange(distribution=[hash[f]])
+      +- Calc(select=[f])
+         +- TableSourceScan(table=[[T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMinusLeftIsEmpty">
+    <Resource name="sql">
+      <![CDATA[SELECT c FROM T1 WHERE 1=0 EXCEPT SELECT f FROM T2]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalMinus(all=[false])
+:- LogicalProject(c=[$2])
+:  +- LogicalFilter(condition=[=(1, 0)])
+:     +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(f=[$2])
+   +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Values(tuples=[[]], values=[c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMinusRightIsEmpty">
+    <Resource name="sql">
+      <![CDATA[SELECT c FROM T1 EXCEPT SELECT f FROM T2 WHERE 1=0]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalMinus(all=[false])
+:- LogicalProject(c=[$2])
+:  +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(f=[$2])
+   +- LogicalFilter(condition=[=(1, 0)])
+      +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+HashAggregate(isMerge=[true], groupBy=[c], select=[c])
++- Exchange(distribution=[hash[c]])
+   +- LocalHashAggregate(groupBy=[c], select=[c])
+      +- Calc(select=[c])
+         +- TableSourceScan(table=[[T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMinusWithNestedTypes">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable EXCEPT SELECT * FROM MyTable]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalMinus(all=[false])
+:- LogicalProject(a=[$0], b=[$1], c=[$2])
+:  +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(a=[$0], b=[$1], c=[$2])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+HashAggregate(isMerge=[false], groupBy=[a, b, c], select=[a, b, c])
++- HashJoin(joinType=[LeftAntiJoin], where=[AND(OR(=(a, a0), AND(IS NULL(a), IS NULL(a0))), OR(=(b, b0), AND(IS NULL(b), IS NULL(b0))), OR(=(c, c0), AND(IS NULL(c), IS NULL(c0))))], select=[a, b, c], build=[left])
+   :- Exchange(distribution=[hash[a, b, c]], exchange_mode=[BATCH], reuse_id=[1])
+   :  +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+   +- Reused(reference_id=[1])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testUnionNullableTypes">
+    <Resource name="sql">
+      <![CDATA[SELECT a FROM A UNION ALL SELECT CASE WHEN c > 0 THEN b ELSE NULL END FROM A]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalUnion(all=[true])
+:- LogicalProject(a=[$0])
+:  +- LogicalTableScan(table=[[A, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(EXPR$0=[CASE(>($2, 0), $1, null:RecordType:peek_no_expand(INTEGER _1, VARCHAR(65536) CHARACTER SET "UTF-16LE" _2))])
+   +- LogicalTableScan(table=[[A, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Union(all=[true], union=[a])
+:- Calc(select=[a])
+:  +- TableSourceScan(table=[[A, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], reuse_id=[1])
++- Calc(select=[CASE(>(c, 0), b, null:RecordType:peek_no_expand(INTEGER _1, VARCHAR(65536) CHARACTER SET "UTF-16LE" _2)) AS EXPR$0])
+   +- Reused(reference_id=[1])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testUnionAnyType">
+    <Resource name="sql">
+      <![CDATA[SELECT a FROM A UNION ALL SELECT b FROM A]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalUnion(all=[true])
+:- LogicalProject(a=[$0])
+:  +- LogicalTableScan(table=[[A, source: [TestTableSource(a, b)]]])
++- LogicalProject(b=[$1])
+   +- LogicalTableScan(table=[[A, source: [TestTableSource(a, b)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Union(all=[true], union=[a])
+:- Calc(select=[a])
+:  +- TableSourceScan(table=[[A, source: [TestTableSource(a, b)]]], fields=[a, b], reuse_id=[1])
++- Calc(select=[b])
+   +- Reused(reference_id=[1])
+]]>
+    </Resource>
+  </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SortLimitTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SortLimitTest.xml
index e4f1be9..94c59b3 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SortLimitTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SortLimitTest.xml
@@ -49,10 +49,7 @@ LogicalSort(sort0=[$0], sort1=[$1], dir0=[DESC-nulls-last], dir1=[ASC-nulls-firs
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-SortLimit(orderBy=[a DESC, b ASC], offset=[1], fetch=[0], global=[true])
-+- Exchange(distribution=[single])
-   +- SortLimit(orderBy=[a DESC, b ASC], offset=[0], fetch=[1], global=[false])
-      +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+Values(tuples=[[]], values=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -88,10 +85,7 @@ LogicalSort(sort0=[$0], dir0=[DESC-nulls-last], fetch=[0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-SortLimit(orderBy=[a DESC], offset=[0], fetch=[0], global=[true])
-+- Exchange(distribution=[single])
-   +- SortLimit(orderBy=[a DESC], offset=[0], fetch=[0], global=[false])
-      +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+Values(tuples=[[]], values=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -147,10 +141,7 @@ LogicalSort(sort0=[$0], dir0=[DESC-nulls-last], fetch=[0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-SortLimit(orderBy=[a DESC], offset=[0], fetch=[0], global=[true])
-+- Exchange(distribution=[single])
-   +- SortLimit(orderBy=[a DESC], offset=[0], fetch=[0], global=[false])
-      +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+Values(tuples=[[]], values=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -207,10 +198,7 @@ LogicalSort(sort0=[$0], sort1=[$1], dir0=[DESC-nulls-last], dir1=[ASC-nulls-firs
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-SortLimit(orderBy=[a DESC, b ASC], offset=[1], fetch=[0], global=[true])
-+- Exchange(distribution=[single])
-   +- SortLimit(orderBy=[a DESC, b ASC], offset=[0], fetch=[1], global=[false])
-      +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+Values(tuples=[[]], values=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.xml
index a1cc277..a1cbb1a 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.xml
@@ -1069,4 +1069,43 @@ SortMergeJoin(joinType=[InnerJoin], where=[=(a, d0)], select=[a, b, c, d, e, f,
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testSubplanReuseWithDynamicFunction">
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalIntersect(all=[false])
+:- LogicalIntersect(all=[false])
+:  :- LogicalProject(random=[$0])
+:  :  +- LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[1])
+:  :     +- LogicalProject(random=[$0], EXPR$1=[RAND()])
+:  :        +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+:  +- LogicalProject(random=[$0])
+:     +- LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[1])
+:        +- LogicalProject(random=[$0], EXPR$1=[RAND()])
+:           +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(random=[$0])
+   +- LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[1])
+      +- LogicalProject(random=[$0], EXPR$1=[RAND()])
+         +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+NestedLoopJoin(joinType=[LeftSemiJoin], where=[OR(=(random, random0), AND(IS NULL(random), IS NULL(random0)))], select=[random], build=[right])
+:- SortAggregate(isMerge=[false], groupBy=[random], select=[random])
+:  +- Sort(orderBy=[random ASC])
+:     +- Exchange(distribution=[hash[random]])
+:        +- NestedLoopJoin(joinType=[LeftSemiJoin], where=[OR(=(random, random0), AND(IS NULL(random), IS NULL(random0)))], select=[random], build=[right])
+:           :- Exchange(distribution=[any], exchange_mode=[BATCH])
+:           :  +- Calc(select=[random], reuse_id=[1])
+:           :     +- SortLimit(orderBy=[EXPR$1 ASC], offset=[0], fetch=[1], global=[true])
+:           :        +- Exchange(distribution=[single])
+:           :           +- SortLimit(orderBy=[EXPR$1 ASC], offset=[0], fetch=[1], global=[false])
+:           :              +- Calc(select=[a AS random, RAND() AS EXPR$1])
+:           :                 +- TableSourceScan(table=[[x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+:           +- Exchange(distribution=[broadcast], reuse_id=[2])
+:              +- Reused(reference_id=[1])
++- Reused(reference_id=[2])
+]]>
+    </Resource>
+  </TestCase>
 </Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/FlinkLimit0RemoveRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/FlinkLimit0RemoveRuleTest.xml
new file mode 100644
index 0000000..2a2949e
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/FlinkLimit0RemoveRuleTest.xml
@@ -0,0 +1,214 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testLimitZeroWithExists">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable WHERE EXISTS (SELECT a FROM MyTable LIMIT 0)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[EXISTS({
+LogicalSort(fetch=[0])
+  LogicalProject(a=[$0])
+    LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+})])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[$3], joinType=[semi])
+   :- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+   +- LogicalProject($f0=[IS NOT NULL($0)])
+      +- LogicalAggregate(group=[{}], m=[MIN($0)])
+         +- LogicalProject(i=[true])
+            +- LogicalValues(tuples=[[]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLimitZeroWithNotIn">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable WHERE a NOT IN (SELECT a FROM MyTable LIMIT 0)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[NOT(IN($0, {
+LogicalSort(fetch=[0])
+  LogicalProject(a=[$0])
+    LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+}))])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[OR(=($0, $3), IS NULL($0), IS NULL($3))], joinType=[anti])
+   :- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+   +- LogicalValues(tuples=[[]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLimitZeroWithIn">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable WHERE a IN (SELECT a FROM MyTable LIMIT 0)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[IN($0, {
+LogicalSort(fetch=[0])
+  LogicalProject(a=[$0])
+    LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+})])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[=($0, $3)], joinType=[semi])
+   :- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+   +- LogicalValues(tuples=[[]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLimitZeroWithJoin">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable INNER JOIN (SELECT * FROM MyTable Limit 0) ON TRUE]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], a0=[$3], b0=[$4], c0=[$5])
++- LogicalJoin(condition=[true], joinType=[inner])
+   :- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+   +- LogicalSort(fetch=[0])
+      +- LogicalProject(a=[$0], b=[$1], c=[$2])
+         +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], a0=[$3], b0=[$4], c0=[$5])
++- LogicalJoin(condition=[true], joinType=[inner])
+   :- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+   +- LogicalValues(tuples=[[]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLimitZeroWithNotExists">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable WHERE NOT EXISTS (SELECT a FROM MyTable LIMIT 0)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[NOT(EXISTS({
+LogicalSort(fetch=[0])
+  LogicalProject(a=[$0])
+    LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+}))])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[$3], joinType=[anti])
+   :- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+   +- LogicalProject($f0=[IS NOT NULL($0)])
+      +- LogicalAggregate(group=[{}], m=[MIN($0)])
+         +- LogicalProject(i=[true])
+            +- LogicalValues(tuples=[[]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSimpleLimitZero">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable LIMIT 0]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalSort(fetch=[0])
++- LogicalProject(a=[$0], b=[$1], c=[$2])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalValues(tuples=[[]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLimitZeroWithOffset">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable ORDER BY a LIMIT 0 OFFSET 10]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalSort(sort0=[$0], dir0=[ASC-nulls-first], offset=[10], fetch=[0])
++- LogicalProject(a=[$0], b=[$1], c=[$2])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalValues(tuples=[[]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLimitZeroWithSelect">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM (SELECT a FROM MyTable LIMIT 0)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0])
++- LogicalSort(fetch=[0])
+   +- LogicalProject(a=[$0])
+      +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0])
++- LogicalValues(tuples=[[]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLimitZeroWithOrderBy">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable ORDER BY a LIMIT 0]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalSort(sort0=[$0], dir0=[ASC-nulls-first], fetch=[0])
++- LogicalProject(a=[$0], b=[$1], c=[$2])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalValues(tuples=[[]])
+]]>
+    </Resource>
+  </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/FlinkPruneEmptyRulesTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/FlinkPruneEmptyRulesTest.xml
new file mode 100644
index 0000000..ddc7d13
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/FlinkPruneEmptyRulesTest.xml
@@ -0,0 +1,63 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testSemiJoinRightIsEmpty">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM T1 WHERE a IN (SELECT d FROM T2 WHERE 1=0)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(d=[$0])
+  LogicalFilter(condition=[=(1, 0)])
+    LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+})])
+   +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalValues(tuples=[[]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testAntiJoinRightIsEmpty">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM T1 WHERE a NOT IN (SELECT d FROM T2 WHERE 1=0)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[NOT(IN($0, {
+LogicalProject(d=[$0])
+  LogicalFilter(condition=[=(1, 0)])
+    LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+}))])
+   +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+  </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/ReplaceIntersectWithSemiJoinRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/ReplaceIntersectWithSemiJoinRuleTest.xml
new file mode 100644
index 0000000..0d5a4ec
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/ReplaceIntersectWithSemiJoinRuleTest.xml
@@ -0,0 +1,123 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testIntersect">
+    <Resource name="sql">
+      <![CDATA[SELECT c FROM T1 INTERSECT SELECT f FROM T2]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalIntersect(all=[false])
+:- LogicalProject(c=[$2])
+:  +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(f=[$2])
+   +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalAggregate(group=[{0}])
++- LogicalJoin(condition=[OR(=($0, $1), AND(IS NULL($0), IS NULL($1)))], joinType=[semi])
+   :- LogicalProject(c=[$2])
+   :  +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
+   +- LogicalProject(f=[$2])
+      +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testIntersectLeftIsEmpty">
+    <Resource name="sql">
+      <![CDATA[SELECT c FROM T1 WHERE 1=0 INTERSECT SELECT f FROM T2]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalIntersect(all=[false])
+:- LogicalProject(c=[$2])
+:  +- LogicalFilter(condition=[=(1, 0)])
+:     +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(f=[$2])
+   +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalAggregate(group=[{0}])
++- LogicalJoin(condition=[OR(=($0, $1), AND(IS NULL($0), IS NULL($1)))], joinType=[semi])
+   :- LogicalProject(c=[$2])
+   :  +- LogicalFilter(condition=[=(1, 0)])
+   :     +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
+   +- LogicalProject(f=[$2])
+      +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testIntersectRightIsEmpty">
+    <Resource name="sql">
+      <![CDATA[SELECT c FROM T1 INTERSECT SELECT f FROM T2 WHERE 1=0]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalIntersect(all=[false])
+:- LogicalProject(c=[$2])
+:  +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(f=[$2])
+   +- LogicalFilter(condition=[=(1, 0)])
+      +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalAggregate(group=[{0}])
++- LogicalJoin(condition=[OR(=($0, $1), AND(IS NULL($0), IS NULL($1)))], joinType=[semi])
+   :- LogicalProject(c=[$2])
+   :  +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
+   +- LogicalProject(f=[$2])
+      +- LogicalFilter(condition=[=(1, 0)])
+         +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testIntersectWithFilter">
+    <Resource name="sql">
+      <![CDATA[SELECT c FROM ((SELECT * FROM T1) INTERSECT (SELECT * FROM T2)) WHERE a > 1]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(c=[$2])
++- LogicalFilter(condition=[>($0, 1)])
+   +- LogicalIntersect(all=[false])
+      :- LogicalProject(a=[$0], b=[$1], c=[$2])
+      :  +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
+      +- LogicalProject(d=[$0], e=[$1], f=[$2])
+         +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(c=[$2])
++- LogicalFilter(condition=[>($0, 1)])
+   +- LogicalAggregate(group=[{0, 1, 2}])
+      +- LogicalJoin(condition=[AND(OR(=($0, $3), AND(IS NULL($0), IS NULL($3))), OR(=($1, $4), AND(IS NULL($1), IS NULL($4))), OR(=($2, $5), AND(IS NULL($2), IS NULL($5))))], joinType=[semi])
+         :- LogicalProject(a=[$0], b=[$1], c=[$2])
+         :  +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
+         +- LogicalProject(d=[$0], e=[$1], f=[$2])
+            +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/ReplaceMinusWithAntiJoinRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/ReplaceMinusWithAntiJoinRuleTest.xml
new file mode 100644
index 0000000..02494da
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/ReplaceMinusWithAntiJoinRuleTest.xml
@@ -0,0 +1,123 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testExcept">
+    <Resource name="sql">
+      <![CDATA[SELECT c FROM T1 EXCEPT SELECT f FROM T2]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalMinus(all=[false])
+:- LogicalProject(c=[$2])
+:  +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(f=[$2])
+   +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalAggregate(group=[{0}])
++- LogicalJoin(condition=[OR(=($0, $1), AND(IS NULL($0), IS NULL($1)))], joinType=[anti])
+   :- LogicalProject(c=[$2])
+   :  +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
+   +- LogicalProject(f=[$2])
+      +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testExceptLeftIsEmpty">
+    <Resource name="sql">
+      <![CDATA[SELECT c FROM T1 WHERE 1=0 EXCEPT SELECT f FROM T2]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalMinus(all=[false])
+:- LogicalProject(c=[$2])
+:  +- LogicalFilter(condition=[=(1, 0)])
+:     +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(f=[$2])
+   +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalAggregate(group=[{0}])
++- LogicalJoin(condition=[OR(=($0, $1), AND(IS NULL($0), IS NULL($1)))], joinType=[anti])
+   :- LogicalProject(c=[$2])
+   :  +- LogicalFilter(condition=[=(1, 0)])
+   :     +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
+   +- LogicalProject(f=[$2])
+      +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testExceptRightIsEmpty">
+    <Resource name="sql">
+      <![CDATA[SELECT c FROM T1 EXCEPT SELECT f FROM T2 WHERE 1=0]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalMinus(all=[false])
+:- LogicalProject(c=[$2])
+:  +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(f=[$2])
+   +- LogicalFilter(condition=[=(1, 0)])
+      +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalAggregate(group=[{0}])
++- LogicalJoin(condition=[OR(=($0, $1), AND(IS NULL($0), IS NULL($1)))], joinType=[anti])
+   :- LogicalProject(c=[$2])
+   :  +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
+   +- LogicalProject(f=[$2])
+      +- LogicalFilter(condition=[=(1, 0)])
+         +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testExceptWithFilter">
+    <Resource name="sql">
+      <![CDATA[SELECT c FROM (SELECT * FROM T1 EXCEPT (SELECT * FROM T2)) WHERE b < 2]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(c=[$2])
++- LogicalFilter(condition=[<($1, 2)])
+   +- LogicalMinus(all=[false])
+      :- LogicalProject(a=[$0], b=[$1], c=[$2])
+      :  +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
+      +- LogicalProject(d=[$0], e=[$1], f=[$2])
+         +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(c=[$2])
++- LogicalFilter(condition=[<($1, 2)])
+   +- LogicalAggregate(group=[{0, 1, 2}])
+      +- LogicalJoin(condition=[AND(OR(=($0, $3), AND(IS NULL($0), IS NULL($3))), OR(=($1, $4), AND(IS NULL($1), IS NULL($4))), OR(=($2, $5), AND(IS NULL($2), IS NULL($5))))], joinType=[anti])
+         :- LogicalProject(a=[$0], b=[$1], c=[$2])
+         :  +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
+         +- LogicalProject(d=[$0], e=[$1], f=[$2])
+            +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/subquery/FlinkRewriteSubQueryRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/subquery/FlinkRewriteSubQueryRuleTest.xml
new file mode 100644
index 0000000..a4ef320
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/subquery/FlinkRewriteSubQueryRuleTest.xml
@@ -0,0 +1,612 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testNotCountStarInScalarQuery">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM x WHERE (SELECT COUNT(e) FROM y WHERE d > 10) > 0]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[>($SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[COUNT($0)])
+  LogicalProject(e=[$1])
+    LogicalFilter(condition=[>($0, 10)])
+      LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+}), 0)])
+   +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalProject(a=[$0], b=[$1], c=[$2])
+   +- LogicalFilter(condition=[>($3, 0)])
+      +- LogicalJoin(condition=[true], joinType=[left])
+         :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+         +- LogicalAggregate(group=[{}], EXPR$0=[COUNT($0)])
+            +- LogicalProject(e=[$1])
+               +- LogicalFilter(condition=[>($0, 10)])
+                  +- LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testNotEmptyGroupByInScalarQuery">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE d > 10 GROUP BY f) > 0]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[>($SCALAR_QUERY({
+LogicalProject(EXPR$0=[$1])
+  LogicalAggregate(group=[{0}], EXPR$0=[COUNT()])
+    LogicalProject(f=[$2])
+      LogicalFilter(condition=[>($0, 10)])
+        LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+}), 0)])
+   +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalProject(a=[$0], b=[$1], c=[$2])
+   +- LogicalFilter(condition=[>($3, 0)])
+      +- LogicalJoin(condition=[true], joinType=[left])
+         :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+         +- LogicalAggregate(group=[{}], agg#0=[SINGLE_VALUE($0)])
+            +- LogicalProject(EXPR$0=[$1])
+               +- LogicalAggregate(group=[{0}], EXPR$0=[COUNT()])
+                  +- LogicalProject(f=[$2])
+                     +- LogicalFilter(condition=[>($0, 10)])
+                        +- LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSqlFromTpcDsQ41">
+    <Resource name="sql">
+      <![CDATA[
+SELECT DISTINCT (i_product_name)
+FROM item i1
+WHERE i_manufact_id BETWEEN 738 AND 738 + 40
+  AND (SELECT count(*) AS item_cnt
+FROM item
+WHERE (i_manufact = i1.i_manufact AND
+  ((i_category = 'Women' AND
+    (i_color = 'powder' OR i_color = 'khaki') AND
+    (i_units = 'Ounce' OR i_units = 'Oz') AND
+    (i_size = 'medium' OR i_size = 'extra large')
+  ) OR
+    (i_category = 'Women' AND
+      (i_color = 'brown' OR i_color = 'honeydew') AND
+      (i_units = 'Bunch' OR i_units = 'Ton') AND
+      (i_size = 'N/A' OR i_size = 'small')
+    ) OR
+    (i_category = 'Men' AND
+      (i_color = 'floral' OR i_color = 'deep') AND
+      (i_units = 'N/A' OR i_units = 'Dozen') AND
+      (i_size = 'petite' OR i_size = 'large')
+    ) OR
+    (i_category = 'Men' AND
+      (i_color = 'light' OR i_color = 'cornflower') AND
+      (i_units = 'Box' OR i_units = 'Pound') AND
+      (i_size = 'medium' OR i_size = 'extra large')
+    ))) OR
+  (i_manufact = i1.i_manufact AND
+    ((i_category = 'Women' AND
+      (i_color = 'midnight' OR i_color = 'snow') AND
+      (i_units = 'Pallet' OR i_units = 'Gross') AND
+      (i_size = 'medium' OR i_size = 'extra large')
+    ) OR
+      (i_category = 'Women' AND
+        (i_color = 'cyan' OR i_color = 'papaya') AND
+        (i_units = 'Cup' OR i_units = 'Dram') AND
+        (i_size = 'N/A' OR i_size = 'small')
+      ) OR
+      (i_category = 'Men' AND
+        (i_color = 'orange' OR i_color = 'frosted') AND
+        (i_units = 'Each' OR i_units = 'Tbl') AND
+        (i_size = 'petite' OR i_size = 'large')
+      ) OR
+      (i_category = 'Men' AND
+        (i_color = 'forest' OR i_color = 'ghost') AND
+        (i_units = 'Lb' OR i_units = 'Bundle') AND
+        (i_size = 'medium' OR i_size = 'extra large')
+      )))) > 0
+ORDER BY i_product_name
+LIMIT 100
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalSort(sort0=[$0], dir0=[ASC-nulls-first], fetch=[100])
++- LogicalAggregate(group=[{0}])
+   +- LogicalProject(i_product_name=[$2])
+      +- LogicalFilter(condition=[AND(>=($0, 738), <=($0, +(738, 40)), >($SCALAR_QUERY({
+LogicalAggregate(group=[{}], item_cnt=[COUNT()])
+  LogicalProject($f0=[0])
+    LogicalFilter(condition=[OR(AND(=($1, $cor0.i_manufact), OR(AND(=($3, _UTF-16LE'Women'), OR(=($4, _UTF-16LE'powder'), =($4, _UTF-16LE'khaki')), OR(=($5, _UTF-16LE'Ounce'), =($5, _UTF-16LE'Oz')), OR(=($6, _UTF-16LE'medium'), =($6, _UTF-16LE'extra large'))), AND(=($3, _UTF-16LE'Women'), OR(=($4, _UTF-16LE'brown'), =($4, _UTF-16LE'honeydew')), OR(=($5, _UTF-16LE'Bunch'), =($5, _UTF-16LE'Ton')), OR(=($6, _UTF-16LE'N/A'), =($6, _UTF-16LE'small'))), AND(=($3, _UTF-16LE'Men'), OR(=($4, _UTF [...]
+      LogicalTableScan(table=[[item, source: [TestTableSource(i_manufact_id, i_manufact, i_product_name, i_category, i_color, i_units, i_size)]]])
+}), 0))], variablesSet=[[$cor0]])
+         +- LogicalTableScan(table=[[item, source: [TestTableSource(i_manufact_id, i_manufact, i_product_name, i_category, i_color, i_units, i_size)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalSort(sort0=[$0], dir0=[ASC-nulls-first], fetch=[100])
++- LogicalAggregate(group=[{0}])
+   +- LogicalProject(i_product_name=[$2])
+      +- LogicalFilter(condition=[AND(>=($0, 738), <=($0, +(738, 40)))])
+         +- LogicalJoin(condition=[=($7, $1)], joinType=[semi])
+            :- LogicalTableScan(table=[[item, source: [TestTableSource(i_manufact_id, i_manufact, i_product_name, i_category, i_color, i_units, i_size)]]])
+            +- LogicalProject(i_manufact=[$1])
+               +- LogicalFilter(condition=[OR(AND(=($3, _UTF-16LE'Women'), OR(=($4, _UTF-16LE'powder'), =($4, _UTF-16LE'khaki')), OR(=($5, _UTF-16LE'Ounce'), =($5, _UTF-16LE'Oz')), OR(=($6, _UTF-16LE'medium'), =($6, _UTF-16LE'extra large'))), AND(=($3, _UTF-16LE'Women'), OR(=($4, _UTF-16LE'brown'), =($4, _UTF-16LE'honeydew')), OR(=($5, _UTF-16LE'Bunch'), =($5, _UTF-16LE'Ton')), OR(=($6, _UTF-16LE'N/A'), =($6, _UTF-16LE'small'))), AND(=($3, _UTF-16LE'Men'), OR(=($4, _UTF-16LE'floral'), =( [...]
+                  +- LogicalTableScan(table=[[item, source: [TestTableSource(i_manufact_id, i_manufact, i_product_name, i_category, i_color, i_units, i_size)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSupportedConversionWithCorrelation1">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE a = d) > 0]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[>($SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+  LogicalProject($f0=[0])
+    LogicalFilter(condition=[=($cor0.a, $0)])
+      LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+}), 0)], variablesSet=[[$cor0]])
+   +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[=($0, $3)], joinType=[semi])
+   :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+   +- LogicalProject(d=[$0])
+      +- LogicalFilter(condition=[true])
+         +- LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSupportedConversionWithCorrelation2">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE a = d) > 0.9]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[>($SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+  LogicalProject($f0=[0])
+    LogicalFilter(condition=[=($cor0.a, $0)])
+      LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+}), 0.9:DECIMAL(2, 1))], variablesSet=[[$cor0]])
+   +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[=($0, $3)], joinType=[semi])
+   :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+   +- LogicalProject(d=[$0])
+      +- LogicalFilter(condition=[true])
+         +- LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSupportedConversionWithCorrelation3">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE a = d) >= 1]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[>=($SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+  LogicalProject($f0=[0])
+    LogicalFilter(condition=[=($cor0.a, $0)])
+      LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+}), 1)], variablesSet=[[$cor0]])
+   +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[=($0, $3)], joinType=[semi])
+   :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+   +- LogicalProject(d=[$0])
+      +- LogicalFilter(condition=[true])
+         +- LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSupportedConversionWithCorrelation4">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE a = d) >= 0.1]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[>=($SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+  LogicalProject($f0=[0])
+    LogicalFilter(condition=[=($cor0.a, $0)])
+      LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+}), 0.1:DECIMAL(2, 1))], variablesSet=[[$cor0]])
+   +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[=($0, $3)], joinType=[semi])
+   :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+   +- LogicalProject(d=[$0])
+      +- LogicalFilter(condition=[true])
+         +- LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSupportedConversionWithCorrelation5">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM x WHERE 0 < (SELECT COUNT(*) FROM y WHERE a = d)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[<(0, $SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+  LogicalProject($f0=[0])
+    LogicalFilter(condition=[=($cor0.a, $0)])
+      LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+}))], variablesSet=[[$cor0]])
+   +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[=($0, $3)], joinType=[semi])
+   :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+   +- LogicalProject(d=[$0])
+      +- LogicalFilter(condition=[true])
+         +- LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSupportedConversionWithCorrelation6">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM x WHERE 0.99 < (SELECT COUNT(*) FROM y WHERE a = d)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[<(0.99:DECIMAL(3, 2), $SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+  LogicalProject($f0=[0])
+    LogicalFilter(condition=[=($cor0.a, $0)])
+      LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+}))], variablesSet=[[$cor0]])
+   +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[=($0, $3)], joinType=[semi])
+   :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+   +- LogicalProject(d=[$0])
+      +- LogicalFilter(condition=[true])
+         +- LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSupportedConversionWithCorrelation7">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM x WHERE 1 <= (SELECT COUNT(*) FROM y WHERE a = d)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[<=(1, $SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+  LogicalProject($f0=[0])
+    LogicalFilter(condition=[=($cor0.a, $0)])
+      LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+}))], variablesSet=[[$cor0]])
+   +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[=($0, $3)], joinType=[semi])
+   :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+   +- LogicalProject(d=[$0])
+      +- LogicalFilter(condition=[true])
+         +- LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSupportedConversionWithCorrelation8">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM x WHERE 0.01 <= (SELECT COUNT(*) FROM y WHERE a = d)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[<=(0.01:DECIMAL(3, 2), $SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+  LogicalProject($f0=[0])
+    LogicalFilter(condition=[=($cor0.a, $0)])
+      LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+}))], variablesSet=[[$cor0]])
+   +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[=($0, $3)], joinType=[semi])
+   :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+   +- LogicalProject(d=[$0])
+      +- LogicalFilter(condition=[true])
+         +- LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSupportedConversionWithoutCorrelation1">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE d > 10) > 0]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[>($SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+  LogicalProject($f0=[0])
+    LogicalFilter(condition=[>($0, 10)])
+      LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+}), 0)])
+   +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[$3], joinType=[semi])
+   :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+   +- LogicalProject($f0=[IS NOT NULL($0)])
+      +- LogicalAggregate(group=[{}], m=[MIN($0)])
+         +- LogicalProject(i=[true])
+            +- LogicalFilter(condition=[>($0, 10)])
+               +- LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSupportedConversionWithoutCorrelation2">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE d > 10) > 0.9]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[>($SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+  LogicalProject($f0=[0])
+    LogicalFilter(condition=[>($0, 10)])
+      LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+}), 0.9:DECIMAL(2, 1))])
+   +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[$3], joinType=[semi])
+   :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+   +- LogicalProject($f0=[IS NOT NULL($0)])
+      +- LogicalAggregate(group=[{}], m=[MIN($0)])
+         +- LogicalProject(i=[true])
+            +- LogicalFilter(condition=[>($0, 10)])
+               +- LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSupportedConversionWithoutCorrelation3">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE d > 10) >= 1]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[>=($SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+  LogicalProject($f0=[0])
+    LogicalFilter(condition=[>($0, 10)])
+      LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+}), 1)])
+   +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[$3], joinType=[semi])
+   :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+   +- LogicalProject($f0=[IS NOT NULL($0)])
+      +- LogicalAggregate(group=[{}], m=[MIN($0)])
+         +- LogicalProject(i=[true])
+            +- LogicalFilter(condition=[>($0, 10)])
+               +- LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSupportedConversionWithoutCorrelation4">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE d > 10) >= 0.1]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[>=($SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+  LogicalProject($f0=[0])
+    LogicalFilter(condition=[>($0, 10)])
+      LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+}), 0.1:DECIMAL(2, 1))])
+   +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[$3], joinType=[semi])
+   :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+   +- LogicalProject($f0=[IS NOT NULL($0)])
+      +- LogicalAggregate(group=[{}], m=[MIN($0)])
+         +- LogicalProject(i=[true])
+            +- LogicalFilter(condition=[>($0, 10)])
+               +- LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSupportedConversionWithoutCorrelation5">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM x WHERE 0 < (SELECT COUNT(*) FROM y WHERE d > 10)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[<(0, $SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+  LogicalProject($f0=[0])
+    LogicalFilter(condition=[>($0, 10)])
+      LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+}))])
+   +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[$3], joinType=[semi])
+   :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+   +- LogicalProject($f0=[IS NOT NULL($0)])
+      +- LogicalAggregate(group=[{}], m=[MIN($0)])
+         +- LogicalProject(i=[true])
+            +- LogicalFilter(condition=[>($0, 10)])
+               +- LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSupportedConversionWithoutCorrelation6">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM x WHERE 0.99 < (SELECT COUNT(*) FROM y WHERE d > 10)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[<(0.99:DECIMAL(3, 2), $SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+  LogicalProject($f0=[0])
+    LogicalFilter(condition=[>($0, 10)])
+      LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+}))])
+   +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[$3], joinType=[semi])
+   :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+   +- LogicalProject($f0=[IS NOT NULL($0)])
+      +- LogicalAggregate(group=[{}], m=[MIN($0)])
+         +- LogicalProject(i=[true])
+            +- LogicalFilter(condition=[>($0, 10)])
+               +- LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSupportedConversionWithoutCorrelation7">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM x WHERE 1 <= (SELECT COUNT(*) FROM y WHERE d > 10)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[<=(1, $SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+  LogicalProject($f0=[0])
+    LogicalFilter(condition=[>($0, 10)])
+      LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+}))])
+   +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[$3], joinType=[semi])
+   :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+   +- LogicalProject($f0=[IS NOT NULL($0)])
+      +- LogicalAggregate(group=[{}], m=[MIN($0)])
+         +- LogicalProject(i=[true])
+            +- LogicalFilter(condition=[>($0, 10)])
+               +- LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSupportedConversionWithoutCorrelation8">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM x WHERE 0.01 <= (SELECT COUNT(*) FROM y WHERE d > 10)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[<=(0.01:DECIMAL(3, 2), $SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+  LogicalProject($f0=[0])
+    LogicalFilter(condition=[>($0, 10)])
+      LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+}))])
+   +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[$3], joinType=[semi])
+   :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+   +- LogicalProject($f0=[IS NOT NULL($0)])
+      +- LogicalAggregate(group=[{}], m=[MIN($0)])
+         +- LogicalProject(i=[true])
+            +- LogicalFilter(condition=[>($0, 10)])
+               +- LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/LimitTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/LimitTest.xml
index 285ee01..3b6ffc6 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/LimitTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/LimitTest.xml
@@ -29,10 +29,7 @@ LogicalSort(fetch=[0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, c])
-+- Limit(offset=[0], fetch=[0])
-   +- Exchange(distribution=[single])
-      +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -89,10 +86,7 @@ LogicalSort(offset=[10], fetch=[0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, c])
-+- Limit(offset=[10], fetch=[0])
-   +- Exchange(distribution=[single])
-      +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -109,10 +103,7 @@ LogicalSort(offset=[0], fetch=[0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, c])
-+- Limit(offset=[0], fetch=[0])
-   +- Exchange(distribution=[single])
-      +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -129,10 +120,7 @@ LogicalSort(fetch=[0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime])
-+- Limit(offset=[0], fetch=[0])
-   +- Exchange(distribution=[single])
-      +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -149,10 +137,7 @@ LogicalSort(fetch=[0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[PROCTIME_MATERIALIZE(proctime) AS proctime, c])
-+- Limit(offset=[0], fetch=[0])
-   +- Exchange(distribution=[single])
-      +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -169,10 +154,7 @@ LogicalSort(fetch=[0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[PROCTIME_MATERIALIZE(proctime) AS desc, c])
-+- Limit(offset=[0], fetch=[0])
-   +- Exchange(distribution=[single])
-      +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -189,10 +171,7 @@ LogicalSort(fetch=[0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[c, PROCTIME_MATERIALIZE(proctime) AS desc])
-+- Limit(offset=[0], fetch=[0])
-   +- Exchange(distribution=[single])
-      +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -209,10 +188,7 @@ LogicalSort(fetch=[0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[c, PROCTIME_MATERIALIZE(proctime) AS proctime])
-+- Limit(offset=[0], fetch=[0])
-   +- Exchange(distribution=[single])
-      +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -229,10 +205,7 @@ LogicalSort(fetch=[0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[rowtime, c])
-+- Limit(offset=[0], fetch=[0])
-   +- Exchange(distribution=[single])
-      +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -249,10 +222,7 @@ LogicalSort(fetch=[0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[rowtime AS desc, c])
-+- Limit(offset=[0], fetch=[0])
-   +- Exchange(distribution=[single])
-      +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -269,10 +239,7 @@ LogicalSort(fetch=[0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[c, rowtime AS desc])
-+- Limit(offset=[0], fetch=[0])
-   +- Exchange(distribution=[single])
-      +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -289,10 +256,7 @@ LogicalSort(fetch=[0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[c, rowtime])
-+- Limit(offset=[0], fetch=[0])
-   +- Exchange(distribution=[single])
-      +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SetOperatorsTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SetOperatorsTest.xml
new file mode 100644
index 0000000..d30ecdd
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SetOperatorsTest.xml
@@ -0,0 +1,226 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testIntersect">
+    <Resource name="sql">
+      <![CDATA[SELECT c FROM T1 INTERSECT SELECT f FROM T2]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalIntersect(all=[false])
+:- LogicalProject(c=[$2])
+:  +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(f=[$2])
+   +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+GroupAggregate(groupBy=[c], select=[c])
++- Exchange(distribution=[hash[c]])
+   +- Join(joinType=[LeftSemiJoin], where=[OR(=(c, f), AND(IS NULL(c), IS NULL(f)))], select=[c])
+      :- Exchange(distribution=[hash[c]])
+      :  +- Calc(select=[c])
+      :     +- TableSourceScan(table=[[T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+      +- Exchange(distribution=[hash[f]])
+         +- Calc(select=[f])
+            +- TableSourceScan(table=[[T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testIntersectLeftIsEmpty">
+    <Resource name="sql">
+      <![CDATA[SELECT c FROM T1 WHERE 1=0 INTERSECT SELECT f FROM T2]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalIntersect(all=[false])
+:- LogicalProject(c=[$2])
+:  +- LogicalFilter(condition=[=(1, 0)])
+:     +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(f=[$2])
+   +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Values(tuples=[[]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testIntersectRightIsEmpty">
+    <Resource name="sql">
+      <![CDATA[SELECT c FROM T1 INTERSECT SELECT f FROM T2 WHERE 1=0]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalIntersect(all=[false])
+:- LogicalProject(c=[$2])
+:  +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(f=[$2])
+   +- LogicalFilter(condition=[=(1, 0)])
+      +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Values(tuples=[[]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMinus">
+    <Resource name="sql">
+      <![CDATA[SELECT c FROM T1 EXCEPT SELECT f FROM T2]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalMinus(all=[false])
+:- LogicalProject(c=[$2])
+:  +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(f=[$2])
+   +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+GroupAggregate(groupBy=[c], select=[c])
++- Exchange(distribution=[hash[c]])
+   +- Join(joinType=[LeftAntiJoin], where=[OR(=(c, f), AND(IS NULL(c), IS NULL(f)))], select=[c])
+      :- Exchange(distribution=[hash[c]])
+      :  +- Calc(select=[c])
+      :     +- TableSourceScan(table=[[T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+      +- Exchange(distribution=[hash[f]])
+         +- Calc(select=[f])
+            +- TableSourceScan(table=[[T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMinusLeftIsEmpty">
+    <Resource name="sql">
+      <![CDATA[SELECT c FROM T1 WHERE 1=0 EXCEPT SELECT f FROM T2]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalMinus(all=[false])
+:- LogicalProject(c=[$2])
+:  +- LogicalFilter(condition=[=(1, 0)])
+:     +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(f=[$2])
+   +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Values(tuples=[[]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMinusRightIsEmpty">
+    <Resource name="sql">
+      <![CDATA[SELECT c FROM T1 EXCEPT SELECT f FROM T2 WHERE 1=0]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalMinus(all=[false])
+:- LogicalProject(c=[$2])
+:  +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(f=[$2])
+   +- LogicalFilter(condition=[=(1, 0)])
+      +- LogicalTableScan(table=[[T2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+GroupAggregate(groupBy=[c], select=[c])
++- Exchange(distribution=[hash[c]])
+   +- Calc(select=[c])
+      +- TableSourceScan(table=[[T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMinusWithNestedTypes">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable EXCEPT SELECT * FROM MyTable]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalMinus(all=[false])
+:- LogicalProject(a=[$0], b=[$1], c=[$2])
+:  +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(a=[$0], b=[$1], c=[$2])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+GroupAggregate(groupBy=[a, b, c], select=[a, b, c])
++- Exchange(distribution=[hash[a, b, c]])
+   +- Join(joinType=[LeftAntiJoin], where=[AND(OR(=(a, a0), AND(IS NULL(a), IS NULL(a0))), OR(=(b, b0), AND(IS NULL(b), IS NULL(b0))), OR(=(c, c0), AND(IS NULL(c), IS NULL(c0))))], select=[a, b, c])
+      :- Exchange(distribution=[hash[a, b, c]], reuse_id=[1])
+      :  +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+      +- Reused(reference_id=[1])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testUnionNullableTypes">
+    <Resource name="sql">
+      <![CDATA[SELECT a FROM A UNION ALL SELECT CASE WHEN c > 0 THEN b ELSE NULL END FROM A]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalUnion(all=[true])
+:- LogicalProject(a=[$0])
+:  +- LogicalTableScan(table=[[A, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(EXPR$0=[CASE(>($2, 0), $1, null:RecordType:peek_no_expand(INTEGER _1, VARCHAR(65536) CHARACTER SET "UTF-16LE" _2))])
+   +- LogicalTableScan(table=[[A, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Union(all=[true], union=[a])
+:- Calc(select=[a])
+:  +- TableSourceScan(table=[[A, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], reuse_id=[1])
++- Calc(select=[CASE(>(c, 0), b, null:RecordType:peek_no_expand(INTEGER _1, VARCHAR(65536) CHARACTER SET "UTF-16LE" _2)) AS EXPR$0])
+   +- Reused(reference_id=[1])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testUnionAnyType">
+    <Resource name="sql">
+      <![CDATA[SELECT a FROM A UNION ALL SELECT b FROM A]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalUnion(all=[true])
+:- LogicalProject(a=[$0])
+:  +- LogicalTableScan(table=[[A, source: [TestTableSource(a, b)]]])
++- LogicalProject(b=[$1])
+   +- LogicalTableScan(table=[[A, source: [TestTableSource(a, b)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Union(all=[true], union=[a])
+:- Calc(select=[a])
+:  +- TableSourceScan(table=[[A, source: [TestTableSource(a, b)]]], fields=[a, b], reuse_id=[1])
++- Calc(select=[b])
+   +- Reused(reference_id=[1])
+]]>
+    </Resource>
+  </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SortLimitTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SortLimitTest.xml
index 0faf861..256ca26 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SortLimitTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SortLimitTest.xml
@@ -217,10 +217,7 @@ LogicalProject(a=[$0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a])
-+- SortLimit(orderBy=[proctime ASC, c ASC], offset=[0], fetch=[0])
-   +- Exchange(distribution=[single])
-      +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -259,10 +256,7 @@ LogicalProject(a=[$0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a])
-+- SortLimit(orderBy=[proctime ASC, c ASC], offset=[1], fetch=[0])
-   +- Exchange(distribution=[single])
-      +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -301,10 +295,7 @@ LogicalProject(a=[$0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a])
-+- SortLimit(orderBy=[proctime DESC], offset=[0], fetch=[0])
-   +- Exchange(distribution=[single])
-      +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -427,10 +418,7 @@ LogicalProject(a=[$0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a])
-+- SortLimit(orderBy=[c ASC, proctime DESC], offset=[0], fetch=[0])
-   +- Exchange(distribution=[single])
-      +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -574,10 +562,7 @@ LogicalProject(a=[$0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a])
-+- SortLimit(orderBy=[rowtime ASC], offset=[0], fetch=[0])
-   +- Exchange(distribution=[single])
-      +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -616,10 +601,7 @@ LogicalProject(a=[$0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a])
-+- SortLimit(orderBy=[rowtime ASC], offset=[0], fetch=[0])
-   +- Exchange(distribution=[single])
-      +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -658,10 +640,7 @@ LogicalProject(a=[$0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a])
-+- SortLimit(orderBy=[rowtime ASC], offset=[1], fetch=[0])
-   +- Exchange(distribution=[single])
-      +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -784,10 +763,7 @@ LogicalProject(a=[$0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a])
-+- SortLimit(orderBy=[rowtime DESC], offset=[0], fetch=[0])
-   +- Exchange(distribution=[single])
-      +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -826,10 +802,7 @@ LogicalProject(a=[$0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a])
-+- SortLimit(orderBy=[rowtime DESC], offset=[1], fetch=[0])
-   +- Exchange(distribution=[single])
-      +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -868,10 +841,7 @@ LogicalProject(a=[$0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a])
-+- SortLimit(orderBy=[rowtime ASC, c ASC], offset=[0], fetch=[0])
-   +- Exchange(distribution=[single])
-      +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -910,10 +880,7 @@ LogicalProject(a=[$0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a])
-+- SortLimit(orderBy=[rowtime ASC, c ASC], offset=[1], fetch=[0])
-   +- Exchange(distribution=[single])
-      +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -952,10 +919,7 @@ LogicalProject(a=[$0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a])
-+- SortLimit(orderBy=[rowtime DESC], offset=[0], fetch=[0])
-   +- Exchange(distribution=[single])
-      +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -994,10 +958,7 @@ LogicalProject(a=[$0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a])
-+- SortLimit(orderBy=[rowtime ASC, c DESC], offset=[1], fetch=[0])
-   +- Exchange(distribution=[single])
-      +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -1036,10 +997,7 @@ LogicalProject(a=[$0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a])
-+- SortLimit(orderBy=[c ASC, rowtime ASC], offset=[0], fetch=[0])
-   +- Exchange(distribution=[single])
-      +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -1078,10 +1036,7 @@ LogicalProject(a=[$0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a])
-+- SortLimit(orderBy=[c ASC, rowtime DESC], offset=[0], fetch=[0])
-   +- Exchange(distribution=[single])
-      +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -1225,10 +1180,7 @@ LogicalProject(a=[$0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a])
-+- SortLimit(orderBy=[c ASC], offset=[0], fetch=[0])
-   +- Exchange(distribution=[single])
-      +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
+Values(tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SubplanReuseTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SubplanReuseTest.xml
index 07639da..9f1f894 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SubplanReuseTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SubplanReuseTest.xml
@@ -647,4 +647,41 @@ Union(all=[true], union=[a, b])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testSubplanReuseWithDynamicFunction">
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalIntersect(all=[false])
+:- LogicalIntersect(all=[false])
+:  :- LogicalProject(random=[$0])
+:  :  +- LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[1])
+:  :     +- LogicalProject(random=[$0], EXPR$1=[RAND()])
+:  :        +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+:  +- LogicalProject(random=[$0])
+:     +- LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[1])
+:        +- LogicalProject(random=[$0], EXPR$1=[RAND()])
+:           +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
++- LogicalProject(random=[$0])
+   +- LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[1])
+      +- LogicalProject(random=[$0], EXPR$1=[RAND()])
+         +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Join(joinType=[LeftSemiJoin], where=[OR(=(random, random0), AND(IS NULL(random), IS NULL(random0)))], select=[random])
+:- Exchange(distribution=[hash[random]])
+:  +- GroupAggregate(groupBy=[random], select=[random])
+:     +- Exchange(distribution=[hash[random]])
+:        +- Join(joinType=[LeftSemiJoin], where=[OR(=(random, random0), AND(IS NULL(random), IS NULL(random0)))], select=[random])
+:           :- Exchange(distribution=[hash[random]], reuse_id=[1])
+:           :  +- Calc(select=[random])
+:           :     +- SortLimit(orderBy=[EXPR$1 ASC], offset=[0], fetch=[1])
+:           :        +- Exchange(distribution=[single])
+:           :           +- Calc(select=[a AS random, RAND() AS EXPR$1])
+:           :              +- TableSourceScan(table=[[x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+:           +- Reused(reference_id=[1])
++- Reused(reference_id=[1])
+]]>
+    </Resource>
+  </TestCase>
 </Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/LimitTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/LimitTest.scala
index 0cfcfe7..9f5a4e5 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/LimitTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/LimitTest.scala
@@ -28,7 +28,6 @@ class LimitTest extends TableTestBase {
 
   private val util = batchTestUtil()
   util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-  // TODO optimize `limit 0`
 
   @Test
   def testLimitWithoutOffset(): Unit = {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SetOperatorsTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SetOperatorsTest.scala
new file mode 100644
index 0000000..635b9b7
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SetOperatorsTest.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.batch.sql
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.GenericTypeInfo
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.{TableConfigOptions, TableException, ValidationException}
+import org.apache.flink.table.plan.util.NonPojo
+import org.apache.flink.table.util.TableTestBase
+
+import org.junit.{Before, Test}
+
+class SetOperatorsTest extends TableTestBase {
+
+  private val util = batchTestUtil()
+
+  @Before
+  def before(): Unit = {
+    util.tableEnv.getConfig.getConf.setString(
+      TableConfigOptions.SQL_EXEC_DISABLED_OPERATORS, "SortAgg")
+    util.addTableSource[(Int, Long, String)]("T1", 'a, 'b, 'c)
+    util.addTableSource[(Int, Long, String)]("T2", 'd, 'e, 'f)
+    util.addTableSource[(Int, Long, Int, String, Long)]("T3", 'a, 'b, 'd, 'c, 'e)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testUnionDifferentColumnSize(): Unit = {
+    // must fail. Union inputs have different column size.
+    util.verifyPlan("SELECT * FROM T1 UNION ALL SELECT * FROM T3")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testUnionDifferentFieldTypes(): Unit = {
+    // must fail. Union inputs have different field types.
+    util.verifyPlan("SELECT a, b, c FROM T1 UNION ALL SELECT d, c, e FROM T3")
+  }
+
+  @Test(expected = classOf[TableException])
+  def testIntersectAll(): Unit = {
+    util.verifyPlan("SELECT c FROM T1 INTERSECT ALL SELECT f FROM T2")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testIntersectDifferentFieldTypes(): Unit = {
+    // must fail. Intersect inputs have different field types.
+    util.verifyPlan("SELECT a, b, c FROM T1 INTERSECT SELECT d, c, e FROM T3")
+  }
+
+  @Test(expected = classOf[TableException])
+  def testMinusAll(): Unit = {
+    util.verifyPlan("SELECT c FROM T1 EXCEPT ALL SELECT f FROM T2")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testMinusDifferentFieldTypes(): Unit = {
+    // must fail. Minus inputs have different field types.
+    util.verifyPlan("SELECT a, b, c FROM T1 EXCEPT SELECT d, c, e FROM T3")
+  }
+
+  @Test
+  def testIntersect(): Unit = {
+    util.verifyPlan("SELECT c FROM T1 INTERSECT SELECT f FROM T2")
+  }
+
+  @Test
+  def testIntersectLeftIsEmpty(): Unit = {
+    util.verifyPlan("SELECT c FROM T1 WHERE 1=0 INTERSECT SELECT f FROM T2")
+  }
+
+  @Test
+  def testIntersectRightIsEmpty(): Unit = {
+    util.verifyPlan("SELECT c FROM T1 INTERSECT SELECT f FROM T2 WHERE 1=0")
+  }
+
+  @Test
+  def testMinus(): Unit = {
+    util.verifyPlan("SELECT c FROM T1 EXCEPT SELECT f FROM T2")
+  }
+
+  @Test
+  def testMinusLeftIsEmpty(): Unit = {
+    util.verifyPlan("SELECT c FROM T1 WHERE 1=0 EXCEPT SELECT f FROM T2")
+  }
+
+  @Test
+  def testMinusRightIsEmpty(): Unit = {
+    util.verifyPlan("SELECT c FROM T1 EXCEPT SELECT f FROM T2 WHERE 1=0")
+  }
+
+  @Test
+  def testMinusWithNestedTypes(): Unit = {
+    util.addTableSource[(Long, (Int, String), Array[Boolean])]("MyTable", 'a, 'b, 'c)
+    util.verifyPlan("SELECT * FROM MyTable EXCEPT SELECT * FROM MyTable")
+  }
+
+  @Test
+  def testUnionNullableTypes(): Unit = {
+    util.addTableSource[((Int, String), (Int, String), Int)]("A", 'a, 'b, 'c)
+    util.verifyPlan("SELECT a FROM A UNION ALL SELECT CASE WHEN c > 0 THEN b ELSE NULL END FROM A")
+  }
+
+  @Test
+  def testUnionAnyType(): Unit = {
+    val util = batchTestUtil()
+    util.addTableSource("A",
+      Array[TypeInformation[_]](
+          new GenericTypeInfo(classOf[NonPojo]),
+          new GenericTypeInfo(classOf[NonPojo])),
+      Array("a", "b"))
+    util.verifyPlan("SELECT a FROM A UNION ALL SELECT b FROM A")
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SortLimitTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SortLimitTest.scala
index 4ebdf44..4aff8d0 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SortLimitTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SortLimitTest.scala
@@ -29,7 +29,6 @@ class SortLimitTest extends TableTestBase {
   private val util = batchTestUtil()
   util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
   util.tableEnv.getConfig.getConf.setInteger(TableConfigOptions.SQL_EXEC_SORT_DEFAULT_LIMIT, 200)
-  // TODO optimize `limit 0`
 
   @Test
   def testNonRangeSortWithoutOffset(): Unit = {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.scala
index f85d8de..5de63bf 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.scala
@@ -368,8 +368,7 @@ class SubplanReuseTest extends TableTestBase {
     util.verifyPlan(sqlQuery)
   }
 
-  @Test(expected = classOf[TableException])
-  // INTERSECT is not supported now
+  @Test
   def testSubplanReuseWithDynamicFunction(): Unit = {
     val sqlQuery = util.tableEnv.sqlQuery(
       """
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkLimit0RemoveRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkLimit0RemoveRuleTest.scala
new file mode 100644
index 0000000..60af860
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkLimit0RemoveRuleTest.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.calcite.CalciteConfig
+import org.apache.flink.table.plan.optimize.program.{BatchOptimizeContext, FlinkChainedProgram, FlinkHepRuleSetProgramBuilder, HEP_RULES_EXECUTION_TYPE}
+import org.apache.flink.table.util.TableTestBase
+
+import org.apache.calcite.plan.hep.HepMatchOrder
+import org.apache.calcite.tools.RuleSets
+import org.junit.{Before, Test}
+
+/**
+  * Test for [[FlinkLimit0RemoveRule]].
+  */
+class FlinkLimit0RemoveRuleTest extends TableTestBase {
+
+  private val util = batchTestUtil()
+
+  @Before
+  def setup(): Unit = {
+    val programs = new FlinkChainedProgram[BatchOptimizeContext]()
+    programs.addLast(
+      "rules",
+      FlinkHepRuleSetProgramBuilder.newBuilder
+        .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
+        .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+        .add(RuleSets.ofList(
+          FlinkSubQueryRemoveRule.FILTER,
+          FlinkLimit0RemoveRule.INSTANCE))
+        .build()
+    )
+    val calciteConfig = CalciteConfig.createBuilder(util.tableEnv.getConfig.getCalciteConfig)
+      .replaceBatchProgram(programs).build()
+    util.tableEnv.getConfig.setCalciteConfig(calciteConfig)
+
+    util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+  }
+
+  @Test
+  def testSimpleLimitZero(): Unit = {
+    util.verifyPlan("SELECT * FROM MyTable LIMIT 0")
+  }
+
+  @Test
+  def testLimitZeroWithOrderBy(): Unit = {
+    util.verifyPlan("SELECT * FROM MyTable ORDER BY a LIMIT 0")
+  }
+
+  @Test
+  def testLimitZeroWithOffset(): Unit = {
+    util.verifyPlan("SELECT * FROM MyTable ORDER BY a LIMIT 0 OFFSET 10")
+  }
+
+  @Test
+  def testLimitZeroWithSelect(): Unit = {
+    util.verifyPlan("SELECT * FROM (SELECT a FROM MyTable LIMIT 0)")
+  }
+
+  @Test
+  def testLimitZeroWithIn(): Unit = {
+    util.verifyPlan("SELECT * FROM MyTable WHERE a IN (SELECT a FROM MyTable LIMIT 0)")
+  }
+
+  @Test
+  def testLimitZeroWithNotIn(): Unit = {
+    util.verifyPlan("SELECT * FROM MyTable WHERE a NOT IN (SELECT a FROM MyTable LIMIT 0)")
+  }
+
+  @Test
+  def testLimitZeroWithExists(): Unit = {
+    util.verifyPlan("SELECT * FROM MyTable WHERE EXISTS (SELECT a FROM MyTable LIMIT 0)")
+  }
+
+  @Test
+  def testLimitZeroWithNotExists(): Unit = {
+    util.verifyPlan("SELECT * FROM MyTable WHERE NOT EXISTS (SELECT a FROM MyTable LIMIT 0)")
+  }
+
+  @Test
+  def testLimitZeroWithJoin(): Unit = {
+    util.verifyPlan("SELECT * FROM MyTable INNER JOIN (SELECT * FROM MyTable Limit 0) ON TRUE")
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkPruneEmptyRulesTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkPruneEmptyRulesTest.scala
new file mode 100644
index 0000000..3f15f18
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkPruneEmptyRulesTest.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.calcite.CalciteConfig
+import org.apache.flink.table.plan.optimize.program.{BatchOptimizeContext, FlinkChainedProgram, FlinkHepRuleSetProgramBuilder, HEP_RULES_EXECUTION_TYPE}
+import org.apache.flink.table.util.TableTestBase
+
+import org.apache.calcite.plan.hep.HepMatchOrder
+import org.apache.calcite.rel.rules.{PruneEmptyRules, ReduceExpressionsRule}
+import org.apache.calcite.tools.RuleSets
+import org.junit.{Before, Test}
+
+/**
+  * Test for [[FlinkPruneEmptyRules]].
+  */
+class FlinkPruneEmptyRulesTest extends TableTestBase {
+
+  private val util = batchTestUtil()
+
+  @Before
+  def setup(): Unit = {
+    val programs = new FlinkChainedProgram[BatchOptimizeContext]()
+    programs.addLast(
+      "rules",
+      FlinkHepRuleSetProgramBuilder.newBuilder
+        .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
+        .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+        .add(RuleSets.ofList(
+          FlinkSubQueryRemoveRule.FILTER,
+          ReduceExpressionsRule.FILTER_INSTANCE,
+          ReduceExpressionsRule.PROJECT_INSTANCE,
+          PruneEmptyRules.FILTER_INSTANCE,
+          PruneEmptyRules.PROJECT_INSTANCE,
+          FlinkPruneEmptyRules.JOIN_RIGHT_INSTANCE))
+        .build()
+    )
+    val calciteConfig = CalciteConfig.createBuilder(util.tableEnv.getConfig.getCalciteConfig)
+      .replaceBatchProgram(programs).build()
+    util.tableEnv.getConfig.setCalciteConfig(calciteConfig)
+
+    util.addTableSource[(Int, Long, String)]("T1", 'a, 'b, 'c)
+    util.addTableSource[(Int, Long, String)]("T2", 'd, 'e, 'f)
+  }
+
+  @Test
+  def testSemiJoinRightIsEmpty(): Unit = {
+    util.verifyPlan("SELECT * FROM T1 WHERE a IN (SELECT d FROM T2 WHERE 1=0)")
+  }
+
+  @Test
+  def testAntiJoinRightIsEmpty(): Unit = {
+    util.verifyPlan("SELECT * FROM T1 WHERE a NOT IN (SELECT d FROM T2 WHERE 1=0)")
+  }
+
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/ReplaceIntersectWithSemiJoinRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/ReplaceIntersectWithSemiJoinRuleTest.scala
new file mode 100644
index 0000000..c867de8
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/ReplaceIntersectWithSemiJoinRuleTest.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.calcite.CalciteConfig
+import org.apache.flink.table.plan.optimize.program.{BatchOptimizeContext, FlinkChainedProgram,
+  FlinkHepRuleSetProgramBuilder, HEP_RULES_EXECUTION_TYPE}
+import org.apache.flink.table.util.TableTestBase
+
+import org.apache.calcite.plan.hep.HepMatchOrder
+import org.apache.calcite.tools.RuleSets
+import org.junit.{Before, Test}
+
+/**
+  * Test for [[ReplaceIntersectWithSemiJoinRule]].
+  */
+class ReplaceIntersectWithSemiJoinRuleTest extends TableTestBase {
+
+  private val util = batchTestUtil()
+
+  @Before
+  def setup(): Unit = {
+    val programs = new FlinkChainedProgram[BatchOptimizeContext]()
+    programs.addLast(
+      "rules",
+      FlinkHepRuleSetProgramBuilder.newBuilder
+        .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
+        .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+        .add(RuleSets.ofList(ReplaceIntersectWithSemiJoinRule.INSTANCE))
+        .build()
+    )
+    val calciteConfig = CalciteConfig.createBuilder(util.tableEnv.getConfig.getCalciteConfig)
+      .replaceBatchProgram(programs).build()
+    util.tableEnv.getConfig.setCalciteConfig(calciteConfig)
+
+
+    util.addTableSource[(Int, Long, String)]("T1", 'a, 'b, 'c)
+    util.addTableSource[(Int, Long, String)]("T2", 'd, 'e, 'f)
+  }
+
+  @Test
+  def testIntersectAll(): Unit = {
+    util.verifyPlanNotExpected("SELECT c FROM T1 INTERSECT ALL SELECT f FROM T2", "joinType=[semi]")
+  }
+
+  @Test
+  def testIntersect(): Unit = {
+    util.verifyPlan("SELECT c FROM T1 INTERSECT SELECT f FROM T2")
+  }
+
+  @Test
+  def testIntersectWithFilter(): Unit = {
+    util.verifyPlan("SELECT c FROM ((SELECT * FROM T1) INTERSECT (SELECT * FROM T2)) WHERE a > 1")
+  }
+
+  @Test
+  def testIntersectLeftIsEmpty(): Unit = {
+    util.verifyPlan("SELECT c FROM T1 WHERE 1=0 INTERSECT SELECT f FROM T2")
+  }
+
+  @Test
+  def testIntersectRightIsEmpty(): Unit = {
+    util.verifyPlan("SELECT c FROM T1 INTERSECT SELECT f FROM T2 WHERE 1=0")
+  }
+
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/ReplaceMinusWithAntiJoinRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/ReplaceMinusWithAntiJoinRuleTest.scala
new file mode 100644
index 0000000..f5b6fbe
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/ReplaceMinusWithAntiJoinRuleTest.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.calcite.CalciteConfig
+import org.apache.flink.table.plan.optimize.program.{BatchOptimizeContext, FlinkChainedProgram, FlinkHepRuleSetProgramBuilder, HEP_RULES_EXECUTION_TYPE}
+import org.apache.flink.table.util.TableTestBase
+
+import org.apache.calcite.plan.hep.HepMatchOrder
+import org.apache.calcite.tools.RuleSets
+import org.junit.{Before, Test}
+
+/**
+  * Test for [[ReplaceMinusWithAntiJoinRule]].
+  */
+class ReplaceMinusWithAntiJoinRuleTest extends TableTestBase {
+
+  private val util = batchTestUtil()
+
+  @Before
+  def setup(): Unit = {
+    val programs = new FlinkChainedProgram[BatchOptimizeContext]()
+    programs.addLast(
+      "rules",
+      FlinkHepRuleSetProgramBuilder.newBuilder
+        .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
+        .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+        .add(RuleSets.ofList(ReplaceMinusWithAntiJoinRule.INSTANCE))
+        .build()
+    )
+    val calciteConfig = CalciteConfig.createBuilder(util.tableEnv.getConfig.getCalciteConfig)
+      .replaceBatchProgram(programs).build()
+    util.tableEnv.getConfig.setCalciteConfig(calciteConfig)
+
+
+    util.addTableSource[(Int, Long, String)]("T1", 'a, 'b, 'c)
+    util.addTableSource[(Int, Long, String)]("T2", 'd, 'e, 'f)
+  }
+
+  @Test
+  def testExceptAll(): Unit = {
+    util.verifyPlanNotExpected("SELECT c FROM T1 EXCEPT ALL SELECT f FROM T2", "joinType=[anti]")
+  }
+
+  @Test
+  def testExcept(): Unit = {
+    util.verifyPlan("SELECT c FROM T1 EXCEPT SELECT f FROM T2")
+  }
+
+  @Test
+  def testExceptWithFilter(): Unit = {
+    util.verifyPlan("SELECT c FROM (SELECT * FROM T1 EXCEPT (SELECT * FROM T2)) WHERE b < 2")
+  }
+
+  @Test
+  def testExceptLeftIsEmpty(): Unit = {
+    util.verifyPlan("SELECT c FROM T1 WHERE 1=0 EXCEPT SELECT f FROM T2")
+  }
+
+  @Test
+  def testExceptRightIsEmpty(): Unit = {
+    util.verifyPlan("SELECT c FROM T1 EXCEPT SELECT f FROM T2 WHERE 1=0")
+  }
+
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/subquery/FlinkRewriteSubQueryRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/subquery/FlinkRewriteSubQueryRuleTest.scala
new file mode 100644
index 0000000..255e2ba
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/subquery/FlinkRewriteSubQueryRuleTest.scala
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical.subquery
+
+import org.apache.flink.api.scala._
+
+import org.junit.{Before, Test}
+
+/**
+  * Test for [[org.apache.flink.table.plan.rules.logical.FlinkRewriteSubQueryRule]].
+  */
+class FlinkRewriteSubQueryRuleTest extends SubQueryTestBase {
+
+  @Before
+  def setup(): Unit = {
+    util.addTableSource[(Int, Long, String)]("x", 'a, 'b, 'c)
+    util.addTableSource[(Int, Long, String)]("y", 'd, 'e, 'f)
+  }
+
+  @Test
+  def testNotCountStarInScalarQuery(): Unit = {
+    util.verifyPlan("SELECT * FROM x WHERE (SELECT COUNT(e) FROM y WHERE d > 10) > 0")
+  }
+
+  @Test
+  def testNotEmptyGroupByInScalarQuery(): Unit = {
+    util.verifyPlan("SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE d > 10 GROUP BY f) > 0")
+  }
+
+  @Test
+  def testUnsupportedConversionWithUnexpectedComparisonNumber(): Unit = {
+    // without correlation
+    util.verifyPlanNotExpected(
+      "SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE d > 10) > 1", "joinType=[semi]")
+    util.verifyPlanNotExpected(
+      "SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE d > 10) >= 0", "joinType=[semi]")
+    util.verifyPlanNotExpected(
+      "SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE d > 10) > -1", "joinType=[semi]")
+    util.verifyPlanNotExpected(
+      "SELECT * FROM x WHERE 0 <= (SELECT COUNT(*) FROM y WHERE d > 10)", "joinType=[semi]")
+    util.verifyPlanNotExpected(
+      "SELECT * FROM x WHERE -1 < (SELECT COUNT(*) FROM y WHERE d > 10)", "joinType=[semi]")
+
+    // with correlation
+    util.verifyPlanNotExpected(
+      "SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE a = d) > 1", "joinType=[semi]")
+    util.verifyPlanNotExpected(
+      "SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE a = d) >= 0", "joinType=[semi]")
+    util.verifyPlanNotExpected(
+      "SELECT * FROM x WHERE 1 < (SELECT COUNT(*) FROM y WHERE a = d)", "joinType=[semi]")
+    util.verifyPlanNotExpected(
+      "SELECT * FROM x WHERE 0 <= (SELECT COUNT(*) FROM y WHERE a = d)", "joinType=[semi]")
+  }
+
+  @Test
+  def testSupportedConversionWithoutCorrelation1(): Unit = {
+    util.verifyPlan("SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE d > 10) > 0")
+  }
+
+  @Test
+  def testSupportedConversionWithoutCorrelation2(): Unit = {
+    util.verifyPlan("SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE d > 10) > 0.9")
+  }
+
+  @Test
+  def testSupportedConversionWithoutCorrelation3(): Unit = {
+    util.verifyPlan("SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE d > 10) >= 1")
+  }
+
+  @Test
+  def testSupportedConversionWithoutCorrelation4(): Unit = {
+    util.verifyPlan("SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE d > 10) >= 0.1")
+  }
+
+  @Test
+  def testSupportedConversionWithoutCorrelation5(): Unit = {
+    util.verifyPlan("SELECT * FROM x WHERE 0 < (SELECT COUNT(*) FROM y WHERE d > 10)")
+  }
+
+  @Test
+  def testSupportedConversionWithoutCorrelation6(): Unit = {
+    util.verifyPlan("SELECT * FROM x WHERE 0.99 < (SELECT COUNT(*) FROM y WHERE d > 10)")
+  }
+
+  @Test
+  def testSupportedConversionWithoutCorrelation7(): Unit = {
+    util.verifyPlan("SELECT * FROM x WHERE 1 <= (SELECT COUNT(*) FROM y WHERE d > 10)")
+  }
+
+  @Test
+  def testSupportedConversionWithoutCorrelation8(): Unit = {
+    util.verifyPlan("SELECT * FROM x WHERE 0.01 <= (SELECT COUNT(*) FROM y WHERE d > 10)")
+  }
+
+  @Test
+  def testSupportedConversionWithCorrelation1(): Unit = {
+    // with correlation
+    util.verifyPlan("SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE a = d) > 0")
+  }
+
+  @Test
+  def testSupportedConversionWithCorrelation2(): Unit = {
+    util.verifyPlan("SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE a = d) > 0.9")
+  }
+
+  @Test
+  def testSupportedConversionWithCorrelation3(): Unit = {
+    util.verifyPlan("SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE a = d) >= 1")
+  }
+
+  @Test
+  def testSupportedConversionWithCorrelation4(): Unit = {
+    util.verifyPlan("SELECT * FROM x WHERE (SELECT COUNT(*) FROM y WHERE a = d) >= 0.1")
+  }
+
+  @Test
+  def testSupportedConversionWithCorrelation5(): Unit = {
+    util.verifyPlan("SELECT * FROM x WHERE 0 < (SELECT COUNT(*) FROM y WHERE a = d)")
+  }
+
+  @Test
+  def testSupportedConversionWithCorrelation6(): Unit = {
+    util.verifyPlan("SELECT * FROM x WHERE 0.99 < (SELECT COUNT(*) FROM y WHERE a = d)")
+  }
+
+  @Test
+  def testSupportedConversionWithCorrelation7(): Unit = {
+    util.verifyPlan("SELECT * FROM x WHERE 1 <= (SELECT COUNT(*) FROM y WHERE a = d)")
+  }
+
+  @Test
+  def testSupportedConversionWithCorrelation8(): Unit = {
+    util.verifyPlan("SELECT * FROM x WHERE 0.01 <= (SELECT COUNT(*) FROM y WHERE a = d)")
+  }
+
+  @Test
+  def testSqlFromTpcDsQ41(): Unit = {
+    util.addTableSource[(Int, String, String, String, String, String, String)]("item",
+      'i_manufact_id, 'i_manufact, 'i_product_name, 'i_category, 'i_color, 'i_units, 'i_size)
+    val sqlQuery =
+      """
+        |SELECT DISTINCT (i_product_name)
+        |FROM item i1
+        |WHERE i_manufact_id BETWEEN 738 AND 738 + 40
+        |  AND (SELECT count(*) AS item_cnt
+        |FROM item
+        |WHERE (i_manufact = i1.i_manufact AND
+        |  ((i_category = 'Women' AND
+        |    (i_color = 'powder' OR i_color = 'khaki') AND
+        |    (i_units = 'Ounce' OR i_units = 'Oz') AND
+        |    (i_size = 'medium' OR i_size = 'extra large')
+        |  ) OR
+        |    (i_category = 'Women' AND
+        |      (i_color = 'brown' OR i_color = 'honeydew') AND
+        |      (i_units = 'Bunch' OR i_units = 'Ton') AND
+        |      (i_size = 'N/A' OR i_size = 'small')
+        |    ) OR
+        |    (i_category = 'Men' AND
+        |      (i_color = 'floral' OR i_color = 'deep') AND
+        |      (i_units = 'N/A' OR i_units = 'Dozen') AND
+        |      (i_size = 'petite' OR i_size = 'large')
+        |    ) OR
+        |    (i_category = 'Men' AND
+        |      (i_color = 'light' OR i_color = 'cornflower') AND
+        |      (i_units = 'Box' OR i_units = 'Pound') AND
+        |      (i_size = 'medium' OR i_size = 'extra large')
+        |    ))) OR
+        |  (i_manufact = i1.i_manufact AND
+        |    ((i_category = 'Women' AND
+        |      (i_color = 'midnight' OR i_color = 'snow') AND
+        |      (i_units = 'Pallet' OR i_units = 'Gross') AND
+        |      (i_size = 'medium' OR i_size = 'extra large')
+        |    ) OR
+        |      (i_category = 'Women' AND
+        |        (i_color = 'cyan' OR i_color = 'papaya') AND
+        |        (i_units = 'Cup' OR i_units = 'Dram') AND
+        |        (i_size = 'N/A' OR i_size = 'small')
+        |      ) OR
+        |      (i_category = 'Men' AND
+        |        (i_color = 'orange' OR i_color = 'frosted') AND
+        |        (i_units = 'Each' OR i_units = 'Tbl') AND
+        |        (i_size = 'petite' OR i_size = 'large')
+        |      ) OR
+        |      (i_category = 'Men' AND
+        |        (i_color = 'forest' OR i_color = 'ghost') AND
+        |        (i_units = 'Lb' OR i_units = 'Bundle') AND
+        |        (i_size = 'medium' OR i_size = 'extra large')
+        |      )))) > 0
+        |ORDER BY i_product_name
+        |LIMIT 100
+      """.stripMargin
+    util.verifyPlan(sqlQuery)
+  }
+
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/LimitTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/LimitTest.scala
index fe440f4..b98d7b2 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/LimitTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/LimitTest.scala
@@ -28,7 +28,6 @@ class LimitTest extends TableTestBase {
 
   private val util = streamTestUtil()
   util.addDataStream[(Int, String, Long)]("MyTable", 'a, 'b, 'c, 'proctime, 'rowtime)
-  // TODO optimize `limit 0`
 
   @Test
   def testLimitWithoutOffset(): Unit = {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/SetOperatorsTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/SetOperatorsTest.scala
new file mode 100644
index 0000000..85f4d40
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/SetOperatorsTest.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.stream.sql
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.GenericTypeInfo
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.{TableException, ValidationException}
+import org.apache.flink.table.plan.util.NonPojo
+import org.apache.flink.table.util.TableTestBase
+
+import org.junit.{Before, Test}
+
+class SetOperatorsTest extends TableTestBase {
+
+  private val util = streamTestUtil()
+
+  @Before
+  def before(): Unit = {
+    util.addTableSource[(Int, Long, String)]("T1", 'a, 'b, 'c)
+    util.addTableSource[(Int, Long, String)]("T2", 'd, 'e, 'f)
+    util.addTableSource[(Int, Long, Int, String, Long)]("T3", 'a, 'b, 'd, 'c, 'e)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testUnionDifferentColumnSize(): Unit = {
+    // must fail. Union inputs have different column size.
+    util.verifyPlan("SELECT * FROM T1 UNION ALL SELECT * FROM T3")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testUnionDifferentFieldTypes(): Unit = {
+    // must fail. Union inputs have different field types.
+    util.verifyPlan("SELECT a, b, c FROM T1 UNION ALL SELECT d, c, e FROM T3")
+  }
+
+  @Test(expected = classOf[TableException])
+  def testIntersectAll(): Unit = {
+    util.verifyPlan("SELECT c FROM T1 INTERSECT ALL SELECT f FROM T2")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testIntersectDifferentFieldTypes(): Unit = {
+    // must fail. Intersect inputs have different field types.
+    util.verifyPlan("SELECT a, b, c FROM T1 INTERSECT SELECT d, c, e FROM T3")
+  }
+
+  @Test(expected = classOf[TableException])
+  def testMinusAll(): Unit = {
+    util.verifyPlan("SELECT c FROM T1 EXCEPT ALL SELECT f FROM T2")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testMinusDifferentFieldTypes(): Unit = {
+    // must fail. Minus inputs have different field types.
+    util.verifyPlan("SELECT a, b, c FROM T1 EXCEPT SELECT d, c, e FROM T3")
+  }
+
+  @Test
+  def testIntersect(): Unit = {
+    util.verifyPlan("SELECT c FROM T1 INTERSECT SELECT f FROM T2")
+  }
+
+  @Test
+  def testIntersectLeftIsEmpty(): Unit = {
+    util.verifyPlan("SELECT c FROM T1 WHERE 1=0 INTERSECT SELECT f FROM T2")
+  }
+
+  @Test
+  def testIntersectRightIsEmpty(): Unit = {
+    util.verifyPlan("SELECT c FROM T1 INTERSECT SELECT f FROM T2 WHERE 1=0")
+  }
+
+  @Test
+  def testMinus(): Unit = {
+    util.verifyPlan("SELECT c FROM T1 EXCEPT SELECT f FROM T2")
+  }
+
+  @Test
+  def testMinusLeftIsEmpty(): Unit = {
+    util.verifyPlan("SELECT c FROM T1 WHERE 1=0 EXCEPT SELECT f FROM T2")
+  }
+
+  @Test
+  def testMinusRightIsEmpty(): Unit = {
+    util.verifyPlan("SELECT c FROM T1 EXCEPT SELECT f FROM T2 WHERE 1=0")
+  }
+
+  @Test
+  def testMinusWithNestedTypes(): Unit = {
+    util.addTableSource[(Long, (Int, String), Array[Boolean])]("MyTable", 'a, 'b, 'c)
+    util.verifyPlan("SELECT * FROM MyTable EXCEPT SELECT * FROM MyTable")
+  }
+
+  @Test
+  def testUnionNullableTypes(): Unit = {
+    util.addTableSource[((Int, String), (Int, String), Int)]("A", 'a, 'b, 'c)
+    util.verifyPlan("SELECT a FROM A UNION ALL SELECT CASE WHEN c > 0 THEN b ELSE NULL END FROM A")
+  }
+
+  @Test
+  def testUnionAnyType(): Unit = {
+    val util = batchTestUtil()
+    util.addTableSource("A",
+      Array[TypeInformation[_]](
+        new GenericTypeInfo(classOf[NonPojo]),
+        new GenericTypeInfo(classOf[NonPojo])),
+      Array("a", "b"))
+    util.verifyPlan("SELECT a FROM A UNION ALL SELECT b FROM A")
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/SortLimitTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/SortLimitTest.scala
index 1633507..285fc46 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/SortLimitTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/SortLimitTest.scala
@@ -27,7 +27,6 @@ class SortLimitTest extends TableTestBase {
 
   private val util = streamTestUtil()
   util.addDataStream[(Int, String, Long)]("MyTable", 'a, 'b, 'c, 'proctime, 'rowtime)
-  // TODO optimize `limit 0`
 
   @Test
   def testSortProcessingTimeAscWithOffSet0AndLimit1(): Unit = {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/SubplanReuseTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/SubplanReuseTest.scala
index a4db284..7613792 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/SubplanReuseTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/SubplanReuseTest.scala
@@ -284,8 +284,7 @@ class SubplanReuseTest extends TableTestBase {
     util.verifyPlan(sqlQuery)
   }
 
-  @Test(expected = classOf[TableException])
-  // INTERSECT is not supported now
+  @Test
   def testSubplanReuseWithDynamicFunction(): Unit = {
     val sqlQuery = util.tableEnv.sqlQuery(
       """
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/pojos.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/pojos.scala
index 9704b45..5d2e62e 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/pojos.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/pojos.scala
@@ -39,3 +39,13 @@ class MyPojo() {
 
   override def toString = s"MyPojo($f1, $f2)"
 }
+
+class NonPojo {
+  val x = new java.util.HashMap[String, String]()
+
+  override def toString: String = x.toString
+
+  override def hashCode(): Int = super.hashCode()
+
+  override def equals(obj: scala.Any): Boolean = super.equals(obj)
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/Limit0RemoveITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/Limit0RemoveITCase.scala
new file mode 100644
index 0000000..def6c29
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/Limit0RemoveITCase.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.sql
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.runtime.utils.BatchTestBase
+import org.apache.flink.table.runtime.utils.BatchTestBase.row
+import org.apache.flink.table.runtime.utils.TestData.numericType
+
+import org.junit.{Before, Test}
+
+import java.math.{BigDecimal => JBigDecimal}
+
+import scala.collection.Seq
+
+
+class Limit0RemoveITCase extends BatchTestBase {
+
+  @Before
+  def before(): Unit = {
+    lazy val numericData = Seq(
+      row(null, 1L, 1.0f, 1.0d, JBigDecimal.valueOf(1)),
+      row(2, null, 2.0f, 2.0d, JBigDecimal.valueOf(2)),
+      row(3, 3L, null, 3.0d, JBigDecimal.valueOf(3)),
+      row(3, 3L, 4.0f, null, JBigDecimal.valueOf(3))
+    )
+
+    registerCollection("t1", numericData, numericType, "a, b, c, d, e")
+    registerCollection("t2", numericData, numericType, "a, b, c, d, e")
+  }
+
+  @Test
+  def testSimpleLimitRemove(): Unit = {
+    val sqlQuery = "SELECT * FROM t1 LIMIT 0"
+    checkResult(sqlQuery, Seq())
+  }
+
+  @Test
+  def testLimitRemoveWithOrderBy(): Unit = {
+    val sqlQuery = "SELECT * FROM t1 ORDER BY a LIMIT 0"
+    checkResult(sqlQuery, Seq())
+  }
+
+  @Test
+  def testLimitRemoveWithJoin(): Unit = {
+    val sqlQuery = "SELECT * FROM t1 JOIN (SELECT * FROM t2 LIMIT 0) ON true"
+    checkResult(sqlQuery, Seq())
+  }
+
+  @Test
+  def testLimitRemoveWithIn(): Unit = {
+    val sqlQuery = "SELECT * FROM t1 WHERE a IN (SELECT a FROM t2 LIMIT 0)"
+    checkResult(sqlQuery, Seq())
+  }
+
+  @Test
+  def testLimitRemoveWithNotIn(): Unit = {
+    val sqlQuery = "SELECT a FROM t1 WHERE a NOT IN (SELECT a FROM t2 LIMIT 0)"
+    checkResult(sqlQuery, Seq(row(2), row(3), row(3), row(null)))
+  }
+
+  @Test(expected = classOf[TableException])
+  // TODO remove exception after translateToPlanInternal is implemented in BatchExecNestedLoopJoin
+  def testLimitRemoveWithExists(): Unit = {
+    val sqlQuery = "SELECT * FROM t1 WHERE EXISTS (SELECT a FROM t2 LIMIT 0)"
+    checkResult(sqlQuery, Seq())
+  }
+
+  @Test(expected = classOf[TableException])
+  // TODO remove exception after translateToPlanInternal is implemented in BatchExecNestedLoopJoin
+  def testLimitRemoveWithNotExists(): Unit = {
+    val sqlQuery = "SELECT * FROM t1 WHERE NOT EXISTS (SELECT a FROM t2 LIMIT 0)"
+    checkResult(sqlQuery, Seq(row(2), row(3), row(3), row(null)))
+  }
+
+  @Test
+  def testLimitRemoveWithSelect(): Unit = {
+    val sqlQuery = "SELECT * FROM (SELECT a FROM t2 LIMIT 0)"
+    checkResult(sqlQuery, Seq())
+  }
+
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/Limit0RemoveITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/Limit0RemoveITCase.scala
new file mode 100644
index 0000000..6a402d5
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/Limit0RemoveITCase.scala
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.{TableConfigOptions, TableException}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.{StreamingTestBase, TestSinkUtil, TestingAppendTableSink, TestingUpsertTableSink}
+
+import org.junit.Assert.assertEquals
+import org.junit.{Before, Test}
+
+class Limit0RemoveITCase extends StreamingTestBase() {
+
+  @Before
+  def setup(): Unit = {
+    tEnv.getConfig.getConf.setBoolean(TableConfigOptions.SQL_EXEC_SOURCE_VALUES_INPUT_ENABLED, true)
+  }
+
+  @Test
+  def testSimpleLimitRemove(): Unit = {
+    val ds = env.fromCollection(Seq(1, 2, 3, 4, 5, 6))
+    val table = ds.toTable(tEnv, 'a)
+    tEnv.registerTable("MyTable", table)
+
+    val sql = "SELECT * FROM MyTable LIMIT 0"
+
+    val result = tEnv.sqlQuery(sql)
+    val sink = TestSinkUtil.configureSink(result, new TestingAppendTableSink())
+    tEnv.writeToSink(result, sink)
+    tEnv.execute()
+
+    assertEquals(0, sink.getAppendResults.size)
+  }
+
+  @Test
+  def testLimitRemoveWithOrderBy(): Unit = {
+    val ds = env.fromCollection(Seq(1, 2, 3, 4, 5, 6))
+    val table = ds.toTable(tEnv, 'a)
+    tEnv.registerTable("MyTable", table)
+
+    val sql = "SELECT * FROM MyTable ORDER BY a LIMIT 0"
+
+    val result = tEnv.sqlQuery(sql)
+    val sink = TestSinkUtil.configureSink(result, new TestingAppendTableSink())
+    tEnv.writeToSink(result, sink)
+    tEnv.execute()
+
+    assertEquals(0, sink.getAppendResults.size)
+  }
+
+  @Test
+  def testLimitRemoveWithSelect(): Unit = {
+    val ds = env.fromCollection(Seq(1, 2, 3, 4, 5, 6))
+    val table = ds.toTable(tEnv, 'a)
+    tEnv.registerTable("MyTable", table)
+
+    val sql = "select a2 from (select cast(a as int) a2 from MyTable limit 0)"
+
+    val result = tEnv.sqlQuery(sql)
+    val sink = TestSinkUtil.configureSink(result, new TestingAppendTableSink())
+    tEnv.writeToSink(result, sink)
+    tEnv.execute()
+
+    assertEquals(0, sink.getAppendResults.size)
+  }
+
+  @Test
+  def testLimitRemoveWithIn(): Unit = {
+    val ds1 = env.fromCollection(Seq(1, 2, 3, 4, 5, 6))
+    val table1 = ds1.toTable(tEnv, 'a)
+    tEnv.registerTable("MyTable1", table1)
+
+    val ds2 = env.fromCollection(Seq(1, 2, 3))
+    val table2 = ds1.toTable(tEnv, 'a)
+    tEnv.registerTable("MyTable2", table2)
+
+    val sql = "SELECT * FROM MyTable1 WHERE a IN (SELECT a FROM MyTable2 LIMIT 0)"
+
+    val result = tEnv.sqlQuery(sql)
+    val sink = TestSinkUtil.configureSink(result, new TestingAppendTableSink())
+    tEnv.writeToSink(result, sink)
+    tEnv.execute()
+
+    assertEquals(0, sink.getAppendResults.size)
+  }
+
+  @Test
+  def testLimitRemoveWithNotIn(): Unit = {
+    val ds1 = env.fromCollection(Seq(1, 2, 3, 4, 5, 6))
+    val table1 = ds1.toTable(tEnv, 'a)
+    tEnv.registerTable("MyTable1", table1)
+
+    val ds2 = env.fromCollection(Seq(1, 2, 3))
+    val table2 = ds1.toTable(tEnv, 'a)
+    tEnv.registerTable("MyTable2", table2)
+
+    val sql = "SELECT * FROM MyTable1 WHERE a NOT IN (SELECT a FROM MyTable2 LIMIT 0)"
+
+    val result = tEnv.sqlQuery(sql)
+    val sink = TestSinkUtil.configureSink(result, new TestingAppendTableSink())
+    tEnv.writeToSink(result, sink)
+    tEnv.execute()
+
+    val expected = Seq("1", "2", "3", "4", "5", "6")
+    assertEquals(expected, sink.getAppendResults.sorted)
+  }
+
+  @Test(expected = classOf[TableException])
+  // TODO remove exception after translateToPlanInternal is implemented in StreamExecJoin
+  def testLimitRemoveWithExists(): Unit = {
+    val ds1 = env.fromCollection(Seq(1, 2, 3, 4, 5, 6))
+    val table1 = ds1.toTable(tEnv, 'a)
+    tEnv.registerTable("MyTable1", table1)
+
+    val ds2 = env.fromCollection(Seq(1, 2, 3))
+    val table2 = ds1.toTable(tEnv, 'a)
+    tEnv.registerTable("MyTable2", table2)
+
+    val sql = "SELECT * FROM MyTable1 WHERE EXISTS (SELECT a FROM MyTable2 LIMIT 0)"
+
+    val result = tEnv.sqlQuery(sql)
+    val sink = TestSinkUtil.configureSink(result, new TestingUpsertTableSink(Array(0)))
+    tEnv.writeToSink(result, sink)
+    tEnv.execute()
+
+    assertEquals(0, sink.getRawResults.size)
+  }
+
+  @Test(expected = classOf[TableException])
+  // TODO remove exception after translateToPlanInternal is implemented in StreamExecJoin
+  def testLimitRemoveWithNotExists(): Unit = {
+    val ds1 = env.fromCollection(Seq(1, 2, 3, 4, 5, 6))
+    val table1 = ds1.toTable(tEnv, 'a)
+    tEnv.registerTable("MyTable1", table1)
+
+    val ds2 = env.fromCollection(Seq(1, 2, 3))
+    val table2 = ds1.toTable(tEnv, 'a)
+    tEnv.registerTable("MyTable2", table2)
+
+    val sql = "SELECT * FROM MyTable1 WHERE NOT EXISTS (SELECT a FROM MyTable2 LIMIT 0)"
+
+    val result = tEnv.sqlQuery(sql)
+    val sink = TestSinkUtil.configureSink(result, new TestingUpsertTableSink(Array(0)))
+    tEnv.writeToSink(result, sink)
+    tEnv.execute()
+
+    val expected = Seq("1", "2", "3", "4", "5", "6")
+    assertEquals(expected, sink.getUpsertResults.sorted)
+  }
+
+  @Test
+  def testLimitRemoveWithJoin(): Unit = {
+    val ds1 = env.fromCollection(Seq(1, 2, 3, 4, 5, 6))
+    val table1 = ds1.toTable(tEnv, 'a1)
+    tEnv.registerTable("MyTable1", table1)
+
+    val ds2 = env.fromCollection(Seq(1, 2, 3))
+    val table2 = ds1.toTable(tEnv, 'a2)
+    tEnv.registerTable("MyTable2", table2)
+
+    val sql = "SELECT a1 FROM MyTable1 INNER JOIN (SELECT a2 FROM MyTable2 LIMIT 0) ON true"
+
+    val result = tEnv.sqlQuery(sql)
+    val sink = TestSinkUtil.configureSink(result, new TestingAppendTableSink())
+    tEnv.writeToSink(result, sink)
+    tEnv.execute()
+
+    assertEquals(0, sink.getAppendResults.size)
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestSink.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestSink.scala
index 1839ba6..4f71696 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestSink.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestSink.scala
@@ -249,7 +249,7 @@ final class TestingUpsertSink(keys: Array[Int], tz: TimeZone)
   }
 }
 
-final class TestingUpsertTableSink(keys: Array[Int], tz: TimeZone)
+final class TestingUpsertTableSink(val keys: Array[Int], val tz: TimeZone)
   extends UpsertStreamTableSink[BaseRow] {
   var fNames: Array[String] = _
   var fTypes: Array[TypeInformation[_]] = _
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/TestSinkUtil.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/TestSinkUtil.scala
index 958c264..b1affec 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/TestSinkUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/TestSinkUtil.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.runtime.utils
 
 import org.apache.flink.table.`type`.TypeConverters.createExternalTypeInfoFromInternalType
-import org.apache.flink.table.api.{Table, TableImpl}
+import org.apache.flink.table.api.{Table, TableException, TableImpl}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.dataformat.GenericRow
 import org.apache.flink.table.runtime.utils.JavaPojos.Pojo1
@@ -40,9 +40,17 @@ object TestSinkUtil {
     val rowType = table.asInstanceOf[TableImpl].getRelNode.getRowType
     val fieldNames = rowType.getFieldNames.asScala.toArray
     val fieldTypes = rowType.getFieldList.asScala
-        .map(field => FlinkTypeFactory.toInternalType(field.getType))
-        .map(createExternalTypeInfoFromInternalType).toArray
-    new TestingAppendTableSink().configure(fieldNames, fieldTypes).asInstanceOf[T]
+      .map(field => FlinkTypeFactory.toInternalType(field.getType))
+      .map(createExternalTypeInfoFromInternalType).toArray
+    sink match {
+      case _: TestingAppendTableSink =>
+        new TestingAppendTableSink().configure(fieldNames, fieldTypes).asInstanceOf[T]
+      case s: TestingUpsertTableSink =>
+        new TestingUpsertTableSink(s.keys, s.tz).configure(fieldNames, fieldTypes).asInstanceOf[T]
+      case _: TestingRetractTableSink =>
+        new TestingRetractTableSink().configure(fieldNames, fieldTypes).asInstanceOf[T]
+      case _ => throw new TableException(s"Unsupported sink: $sink")
+    }
   }
 
   def fieldToString(field: Any, tz: TimeZone): String = {