You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/05/22 01:14:57 UTC

[flink] branch master updated: [FLINK-12509] [table-planner-blink] Introduce planner rules about non semi/anti join, which includes:

This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b36c54d  [FLINK-12509] [table-planner-blink] Introduce planner rules about non semi/anti join, which includes:
b36c54d is described below

commit b36c54dd06122b7ddf10048ca72f1b5e448b4cd2
Author: godfrey he <go...@163.com>
AuthorDate: Wed May 22 09:14:46 2019 +0800

    [FLINK-12509] [table-planner-blink] Introduce planner rules about non semi/anti join, which includes:
    
    1. `JoinConditionEqualityTransferRule` that converts conditions to the left or right table's own independent filter as much as possible.
    
    2. `JoinConditionTypeCoerceRule` to that coerces the both sides of EQUALS(`=`) operator in Join condition to the same type while sans nullability.
    
    3. `JoinDependentConditionDerivationRule` that extracts some sub-conditions in the Join OR condition which can be pushed into join's inputs further by FlinkFilterJoinRule.
    
    4. `JoinDeriveNullFilterRule` that filters null values before join if the count of null value exceeds some threshold.
    
    This closes #8440
---
 .../flink/table/api/PlannerConfigOptions.java      |   7 +
 .../plan/rules/logical/FlinkFilterJoinRule.java    |   7 +-
 .../plan/optimize/program/FlinkBatchProgram.scala  |  29 +-
 .../table/plan/rules/FlinkBatchRuleSets.scala      |  33 +-
 .../table/plan/rules/FlinkStreamRuleSets.scala     |  26 +-
 .../JoinConditionEqualityTransferRule.scala        | 172 ++++++++
 .../logical/JoinConditionTypeCoerceRule.scala      | 128 ++++++
 .../JoinDependentConditionDerivationRule.scala     | 145 +++++++
 .../rules/logical/JoinDeriveNullFilterRule.scala   |  94 +++++
 .../rules/logical/SimplifyJoinConditionRule.scala  |  70 ++++
 .../sql/join/BroadcastHashSemiAntiJoinTest.xml     |  13 +-
 .../plan/batch/sql/join/NestedLoopJoinTest.xml     |  52 +--
 .../batch/sql/join/NestedLoopSemiAntiJoinTest.xml  |  41 +-
 .../table/plan/batch/sql/join/SemiAntiJoinTest.xml |  43 +-
 .../plan/batch/sql/join/ShuffledHashJoinTest.xml   |  27 ++
 .../sql/join/ShuffledHashSemiAntiJoinTest.xml      |  15 +-
 .../plan/batch/sql/join/SortMergeJoinTest.xml      |  53 +--
 .../batch/sql/join/SortMergeSemiAntiJoinTest.xml   |  15 +-
 .../plan/rules/logical/FlinkFilterJoinRuleTest.xml | 435 +++++++++++++++++++++
 .../JoinConditionEqualityTransferRuleTest.xml      | 361 +++++++++++++++++
 .../logical/JoinConditionTypeCoerceRuleTest.xml    | 266 +++++++++++++
 .../JoinDependentConditionDerivationRuleTest.xml   | 208 ++++++++++
 .../rules/logical/JoinDeriveNullFilterRuleTest.xml | 212 ++++++++++
 .../logical/SimplifyJoinConditionRuleTest.xml      |  75 ++++
 .../logical/subquery/SubQuerySemiJoinTest.xml      | 167 ++++----
 .../plan/stream/sql/join/SemiAntiJoinTest.xml      |  30 +-
 .../table/plan/stream/sql/join/WindowJoinTest.xml  |  20 +-
 .../batch/sql/join/BroadcastHashJoinTest.scala     |  14 +-
 .../table/plan/batch/sql/join/JoinTestBase.scala   |  16 +-
 .../table/plan/batch/sql/join/LookupJoinTest.scala |   9 +-
 .../plan/batch/sql/join/ShuffledHashJoinTest.scala |   7 -
 .../rules/logical/FlinkFilterJoinRuleTest.scala    | 157 ++++++++
 .../JoinConditionEqualityTransferRuleTest.scala    | 144 +++++++
 .../logical/JoinConditionTypeCoerceRuleTest.scala  | 114 ++++++
 .../JoinDependentConditionDerivationRuleTest.scala | 118 ++++++
 .../logical/JoinDeriveNullFilterRuleTest.scala     | 117 ++++++
 .../logical/SimplifyJoinConditionRuleTest.scala    |  66 ++++
 .../plan/stream/sql/join/LookupJoinTest.scala      |   2 +
 .../apache/flink/table/util/TableTestBase.scala    |  56 ++-
 39 files changed, 3293 insertions(+), 271 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/PlannerConfigOptions.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/PlannerConfigOptions.java
index 30dc16f..c2bdf85 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/PlannerConfigOptions.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/PlannerConfigOptions.java
@@ -79,6 +79,13 @@ public class PlannerConfigOptions {
 					.withDescription("When the semi-side of semi/anti join can distinct a lot of data in advance," +
 							" we will add distinct node before semi/anti join.");
 
+	public static final ConfigOption<Long> SQL_OPTIMIZER_JOIN_NULL_FILTER_THRESHOLD =
+			key("sql.optimizer.join.null.filter.threshold")
+					.defaultValue(2000000L)
+					.withDescription("If the source of InnerJoin has nullCount more than this value, " +
+							"it will add a null filter (possibly be pushDowned) before the join, filter" +
+							" null values to avoid the impact of null values on the single join node.");
+
 	public static final ConfigOption<Boolean> SQL_OPTIMIZER_DATA_SKEW_DISTINCT_AGG_ENABLED =
 			key("sql.optimizer.data-skew.distinct-agg.enabled")
 					.defaultValue(false)
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/FlinkFilterJoinRule.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/FlinkFilterJoinRule.java
index dd5be0f..3b593de 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/FlinkFilterJoinRule.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/FlinkFilterJoinRule.java
@@ -48,7 +48,7 @@ import static org.apache.calcite.plan.RelOptUtil.conjunctions;
  * This rules is copied from Calcite's {@link org.apache.calcite.rel.rules.FilterJoinRule}.
  * Modification:
  * - Use `FlinkRelOptUtil.classifyFilters` to support SEMI/ANTI join
- * - TODO Handles the ON condition of anti-join can not be pushed down
+ * - Handles the ON condition of anti-join can not be pushed down
  */
 
 /**
@@ -192,10 +192,13 @@ public abstract class FlinkFilterJoinRule extends RelOptRule {
 			}
 		}
 
+		boolean isAntiJoin = joinType == JoinRelType.ANTI;
+
 		// Try to push down filters in ON clause. A ON clause filter can only be
 		// pushed down if it does not affect the non-matching set, i.e. it is
 		// not on the side which is preserved.
-		if (FlinkRelOptUtil.classifyFilters(
+		// A ON clause filter of anti-join can not be pushed down.
+		if (!isAntiJoin && FlinkRelOptUtil.classifyFilters(
 				join,
 				joinFilters,
 				joinType,
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala
index 5f03e5d..0191d79 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala
@@ -33,6 +33,7 @@ object FlinkBatchProgram {
   val DECORRELATE = "decorrelate"
   val DEFAULT_REWRITE = "default_rewrite"
   val PREDICATE_PUSHDOWN = "predicate_pushdown"
+  val JOIN_REWRITE = "join_rewrite"
   val WINDOW = "window"
   val LOGICAL = "logical"
   val LOGICAL_REWRITE = "logical_rewrite"
@@ -105,11 +106,20 @@ object FlinkBatchProgram {
       PREDICATE_PUSHDOWN,
       FlinkGroupProgramBuilder.newBuilder[BatchOptimizeContext]
         .addProgram(
-          FlinkHepRuleSetProgramBuilder.newBuilder
-            .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
-            .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
-            .add(FlinkBatchRuleSets.FILTER_PREPARE_RULES)
-            .build(), "filter rules")
+          FlinkGroupProgramBuilder.newBuilder[BatchOptimizeContext]
+            .addProgram(
+              FlinkHepRuleSetProgramBuilder.newBuilder
+                .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
+                .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+                .add(FlinkBatchRuleSets.JOIN_PREDICATE_REWRITE_RULES)
+                .build(), "join predicate rewrite")
+            .addProgram(
+              FlinkHepRuleSetProgramBuilder.newBuilder
+                .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
+                .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+                .add(FlinkBatchRuleSets.FILTER_PREPARE_RULES)
+                .build(), "other predicate rewrite")
+            .setIterations(5).build())
         .addProgram(
           FlinkHepRuleSetProgramBuilder.newBuilder
             .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
@@ -118,6 +128,15 @@ object FlinkBatchProgram {
             .build(), "prune empty after predicate push down")
         .build())
 
+    // join rewrite
+    chainedProgram.addLast(
+      JOIN_REWRITE,
+      FlinkHepRuleSetProgramBuilder.newBuilder
+        .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
+        .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+        .add(FlinkBatchRuleSets.JOIN_COND_EQUAL_TRANSFER_RULES)
+        .build())
+
     // window rewrite
     chainedProgram.addLast(
       WINDOW,
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
index fa48ecf..bf6f247 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
@@ -35,6 +35,7 @@ object FlinkBatchRuleSets {
   val SEMI_JOIN_RULES: RuleSet = RuleSets.ofList(
     SimplifyFilterConditionRule.EXTENDED,
     FlinkSubQueryRemoveRule.FILTER,
+    JoinConditionTypeCoerceRule.INSTANCE,
     FlinkJoinPushExpressionsRule.INSTANCE
   )
 
@@ -88,10 +89,21 @@ object FlinkBatchRuleSets {
   )
 
   /**
+    * RuleSet to simplify predicate expressions in filters and joins
+    */
+  private val PREDICATE_SIMPLIFY_EXPRESSION_RULES: RuleSet = RuleSets.ofList(
+    SimplifyFilterConditionRule.INSTANCE,
+    SimplifyJoinConditionRule.INSTANCE,
+    JoinConditionTypeCoerceRule.INSTANCE,
+    JoinPushExpressionsRule.INSTANCE
+  )
+
+  /**
     * RuleSet to normalize plans for batch
     */
   val DEFAULT_REWRITE_RULES: RuleSet = RuleSets.ofList((
-    REWRITE_COALESCE_RULES.asScala ++
+    PREDICATE_SIMPLIFY_EXPRESSION_RULES.asScala ++
+      REWRITE_COALESCE_RULES.asScala ++
       REDUCE_EXPRESSION_RULES.asScala ++
       List(
         // Transform window to LogicalWindowAggregate
@@ -123,12 +135,9 @@ object FlinkBatchRuleSets {
     FilterMergeRule.INSTANCE
   )
 
-  /**
-    * Ruleset to simplify expressions
-    */
-  private val PREDICATE_SIMPLIFY_EXPRESSION_RULES: RuleSet = RuleSets.ofList(
-    // TODO: add filter simply and join condition simplify rules
-    JoinPushExpressionsRule.INSTANCE
+  val JOIN_PREDICATE_REWRITE_RULES: RuleSet = RuleSets.ofList(
+    JoinDependentConditionDerivationRule.INSTANCE,
+    JoinDeriveNullFilterRule.INSTANCE
   )
 
   /**
@@ -136,7 +145,7 @@ object FlinkBatchRuleSets {
     */
   val FILTER_PREPARE_RULES: RuleSet = RuleSets.ofList((
     FILTER_RULES.asScala
-      // simplify expressions
+      // simplify predicate expressions in filters and joins
       ++ PREDICATE_SIMPLIFY_EXPRESSION_RULES.asScala
       // reduce expressions in filters and joins
       ++ REDUCE_EXPRESSION_RULES.asScala
@@ -162,7 +171,7 @@ object FlinkBatchRuleSets {
   val PROJECT_RULES: RuleSet = RuleSets.ofList(
     // push a projection past a filter
     ProjectFilterTransposeRule.INSTANCE,
-    // push a projection to the children of a join
+    // push a projection to the children of a non semi/anti join
     // push all expressions to handle the time indicator correctly
     new FlinkProjectJoinTransposeRule(
       PushProjector.ExprCondition.FALSE, RelFactories.LOGICAL_BUILDER),
@@ -189,6 +198,12 @@ object FlinkBatchRuleSets {
     WindowPropertiesRules.WINDOW_PROPERTIES_HAVING_RULE
   )
 
+  val JOIN_COND_EQUAL_TRANSFER_RULES: RuleSet = RuleSets.ofList((
+    RuleSets.ofList(JoinConditionEqualityTransferRule.INSTANCE).asScala ++
+      PREDICATE_SIMPLIFY_EXPRESSION_RULES.asScala ++
+      FILTER_RULES.asScala
+    ).asJava)
+
   /**
     * RuleSet to do logical optimize.
     * This RuleSet is a sub-set of [[LOGICAL_OPT_RULES]].
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
index 1aacac8..cbab9e3 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
@@ -35,6 +35,7 @@ object FlinkStreamRuleSets {
   val SEMI_JOIN_RULES: RuleSet = RuleSets.ofList(
     SimplifyFilterConditionRule.EXTENDED,
     FlinkSubQueryRemoveRule.FILTER,
+    JoinConditionTypeCoerceRule.INSTANCE,
     FlinkJoinPushExpressionsRule.INSTANCE
   )
 
@@ -88,10 +89,21 @@ object FlinkStreamRuleSets {
   )
 
   /**
+    * RuleSet to simplify predicate expressions in filters and joins
+    */
+  private val PREDICATE_SIMPLIFY_EXPRESSION_RULES: RuleSet = RuleSets.ofList(
+    SimplifyFilterConditionRule.INSTANCE,
+    SimplifyJoinConditionRule.INSTANCE,
+    JoinConditionTypeCoerceRule.INSTANCE,
+    JoinPushExpressionsRule.INSTANCE
+  )
+
+  /**
     * RuleSet to normalize plans for stream
     */
   val DEFAULT_REWRITE_RULES: RuleSet = RuleSets.ofList((
-    REWRITE_COALESCE_RULES.asScala ++
+    PREDICATE_SIMPLIFY_EXPRESSION_RULES.asScala ++
+      REWRITE_COALESCE_RULES.asScala ++
       REDUCE_EXPRESSION_RULES.asScala ++
       List(
         StreamLogicalWindowAggregateRule.INSTANCE,
@@ -128,19 +140,11 @@ object FlinkStreamRuleSets {
   )
 
   /**
-    * Ruleset to simplify expressions
-    */
-  private val PREDICATE_SIMPLIFY_EXPRESSION_RULES: RuleSet = RuleSets.ofList(
-    // TODO: add filter simply and join condition simplify rules
-    FlinkJoinPushExpressionsRule.INSTANCE
-  )
-
-  /**
     * RuleSet to do predicate pushdown
     */
   val FILTER_PREPARE_RULES: RuleSet = RuleSets.ofList((
     FILTER_RULES.asScala
-      // simplify expressions
+      // simplify predicate expressions in filters and joins
       ++ PREDICATE_SIMPLIFY_EXPRESSION_RULES.asScala
       // reduce expressions in filters and joins
       ++ REDUCE_EXPRESSION_RULES.asScala
@@ -165,7 +169,7 @@ object FlinkStreamRuleSets {
   val PROJECT_RULES: RuleSet = RuleSets.ofList(
     // push a projection past a filter
     ProjectFilterTransposeRule.INSTANCE,
-    // push a projection to the children of a join
+    // push a projection to the children of a non semi/anti join
     // push all expressions to handle the time indicator correctly
     new FlinkProjectJoinTransposeRule(
       PushProjector.ExprCondition.FALSE, RelFactories.LOGICAL_BUILDER),
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/JoinConditionEqualityTransferRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/JoinConditionEqualityTransferRule.scala
new file mode 100644
index 0000000..910f127
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/JoinConditionEqualityTransferRule.scala
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.table.plan.util.FlinkRexUtil
+
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptUtil}
+import org.apache.calcite.rel.core.{Join, JoinRelType}
+import org.apache.calcite.rex.{RexBuilder, RexCall, RexInputRef, RexNode}
+import org.apache.calcite.sql.SqlKind
+import org.apache.calcite.sql.fun.SqlStdOperatorTable.EQUALS
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+
+/**
+  * Planner rule that converts Join's conditions to the left or right table's own
+  * independent filter as much as possible, so that the rules of filter-push-down can push down
+  * the filter to below.
+  *
+  * <p>e.g. join condition: l_a = r_b and l_a = r_c.
+  * The l_a is a field from left input, both r_b and r_c are fields from the right input.
+  * After rewrite, condition will be: l_a = r_b and r_b = r_c.
+  * r_b = r_c can be pushed down to the right input.
+  */
+class JoinConditionEqualityTransferRule extends RelOptRule(
+  operand(classOf[Join], any),
+  "JoinConditionEqualityTransferRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val join: Join = call.rel(0)
+    val joinType = join.getJoinType
+    if (joinType != JoinRelType.INNER && joinType != JoinRelType.SEMI) {
+      return false
+    }
+
+    val (optimizableFilters, _) = partitionJoinFilters(join)
+    val groups = getEquiFilterRelationshipGroup(optimizableFilters)
+    groups.exists(_.size > 2)
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+    val join: Join = call.rel(0)
+    val (optimizableFilters, remainFilters) = partitionJoinFilters(join)
+    val (equiFiltersToOpt, equiFiltersNotOpt) =
+      getEquiFilterRelationshipGroup(optimizableFilters).partition(_.size > 2)
+
+    val builder = call.builder()
+    val rexBuilder = builder.getRexBuilder
+    val newEquiJoinFilters = mutable.ListBuffer[RexNode]()
+
+    // add equiFiltersNotOpt.
+    equiFiltersNotOpt.foreach { refs =>
+      require(refs.size == 2)
+      newEquiJoinFilters += rexBuilder.makeCall(EQUALS, refs.head, refs.last)
+    }
+
+    // new opt filters.
+    equiFiltersToOpt.foreach { refs =>
+      // partition to InputRef to left and right.
+      val (leftRefs, rightRefs) = refs.partition(fromJoinLeft(join, _))
+      val rexCalls = new mutable.ArrayBuffer[RexNode]()
+
+      // equals for each other.
+      rexCalls ++= makeCalls(rexBuilder, leftRefs)
+      rexCalls ++= makeCalls(rexBuilder, rightRefs)
+
+      // equals for left and right.
+      if (leftRefs.nonEmpty && rightRefs.nonEmpty) {
+        rexCalls += rexBuilder.makeCall(EQUALS, leftRefs.head, rightRefs.head)
+      }
+
+      // add to newEquiJoinFilters with deduplication.
+      rexCalls.foreach(call => newEquiJoinFilters += call)
+    }
+
+    val newJoinFilter = builder.and(remainFilters :+
+      FlinkRexUtil.simplify(rexBuilder, builder.and(newEquiJoinFilters)))
+    val newJoin = join.copy(
+      join.getTraitSet,
+      newJoinFilter,
+      join.getLeft,
+      join.getRight,
+      join.getJoinType,
+      join.isSemiJoinDone)
+
+    call.transformTo(newJoin)
+  }
+
+  /**
+    * Returns true if the given input ref is from join left, else false.
+    */
+  private def fromJoinLeft(join: Join, ref: RexInputRef): Boolean = {
+    require(join.getSystemFieldList.size() == 0)
+    ref.getIndex < join.getLeft.getRowType.getFieldCount
+  }
+
+  /**
+    * Partition join condition to leftRef-rightRef equals and others.
+    */
+  def partitionJoinFilters(join: Join): (Seq[RexNode], Seq[RexNode]) = {
+    val conjunctions = RelOptUtil.conjunctions(join.getCondition)
+    conjunctions.partition {
+      case call: RexCall if call.isA(SqlKind.EQUALS) =>
+        (call.operands.head, call.operands.last) match {
+          case (ref1: RexInputRef, ref2: RexInputRef) =>
+            val isLeft1 = fromJoinLeft(join, ref1)
+            val isLeft2 = fromJoinLeft(join, ref2)
+            isLeft1 != isLeft2
+          case _ => false
+        }
+      case _ => false
+    }
+  }
+
+  /**
+    * Put fields to a group that have equivalence relationships.
+    */
+  def getEquiFilterRelationshipGroup(equiJoinFilters: Seq[RexNode]): Seq[Seq[RexInputRef]] = {
+    val filterSets = mutable.ArrayBuffer[mutable.HashSet[RexInputRef]]()
+    equiJoinFilters.foreach {
+      case call: RexCall =>
+        require(call.isA(SqlKind.EQUALS))
+        val left = call.operands.head.asInstanceOf[RexInputRef]
+        val right = call.operands.last.asInstanceOf[RexInputRef]
+        val set = filterSets.find(set => set.contains(left) || set.contains(right)) match {
+          case Some(s) => s
+          case None =>
+            val s = new mutable.HashSet[RexInputRef]()
+            filterSets += s
+            s
+        }
+        set += left
+        set += right
+    }
+
+    filterSets.map(_.toSeq)
+  }
+
+  /**
+    * Make calls to a number of inputRefs, make sure that they both have a relationship.
+    */
+  def makeCalls(rexBuilder: RexBuilder, nodes: Seq[RexInputRef]): Seq[RexNode] = {
+    val calls = new mutable.ArrayBuffer[RexNode]()
+    if (nodes.length > 1) {
+      val rex = nodes.head
+      nodes.drop(1).foreach(calls += rexBuilder.makeCall(EQUALS, rex, _))
+    }
+    calls
+  }
+}
+
+object JoinConditionEqualityTransferRule {
+  val INSTANCE = new JoinConditionEqualityTransferRule
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/JoinConditionTypeCoerceRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/JoinConditionTypeCoerceRule.scala
new file mode 100644
index 0000000..8e98c15
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/JoinConditionTypeCoerceRule.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.plan.util.FlinkRexUtil
+
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptUtil}
+import org.apache.calcite.rel.`type`.RelDataTypeFactory
+import org.apache.calcite.rel.core.Join
+import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
+import org.apache.calcite.sql.SqlKind
+import org.apache.calcite.sql.`type`.SqlTypeUtil
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+
+/**
+  * Planner rule that coerces the both sides of EQUALS(`=`) operator in Join condition
+  * to the same type while sans nullability.
+  *
+  * <p>For most cases, we already did the type coercion during type validation by implicit
+  * type coercion or during sqlNode to relNode conversion, this rule just does a rechecking
+  * to ensure a strongly uniform equals type, so that during a HashJoin shuffle we can have
+  * the same hashcode of the same value.
+  */
+class JoinConditionTypeCoerceRule extends RelOptRule(
+  operand(classOf[Join], any),
+  "JoinConditionTypeCoerceRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val join: Join = call.rel(0)
+    if (join.getCondition.isAlwaysTrue) {
+      return false
+    }
+    val typeFactory = call.builder().getTypeFactory
+    hasEqualsRefsOfDifferentTypes(typeFactory, join.getCondition)
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+    val join: Join = call.rel(0)
+    val builder = call.builder()
+    val rexBuilder = builder.getRexBuilder
+    val typeFactory = builder.getTypeFactory
+
+    val newJoinFilters = mutable.ArrayBuffer[RexNode]()
+    val joinFilters = RelOptUtil.conjunctions(join.getCondition)
+    joinFilters.foreach {
+      case c: RexCall if c.isA(SqlKind.EQUALS) =>
+        (c.operands.head, c.operands.last) match {
+          case (ref1: RexInputRef, ref2: RexInputRef)
+            if !SqlTypeUtil.equalSansNullability(
+              typeFactory,
+              ref1.getType,
+              ref2.getType) =>
+            val refList = ref1 :: ref2 :: Nil
+            val targetType = typeFactory.leastRestrictive(refList.map(ref => ref.getType))
+            if (targetType == null) {
+              throw new TableException(
+                s"${ref1.getType} and ${ref2.getType} does not have common type now")
+            }
+            newJoinFilters += builder.equals(
+              rexBuilder.ensureType(targetType, ref1, true),
+              rexBuilder.ensureType(targetType, ref2, true))
+          case _ =>
+            newJoinFilters += c
+        }
+      case r: RexNode =>
+        newJoinFilters += r
+    }
+
+    val newCondExp = builder.and(
+      FlinkRexUtil.simplify(rexBuilder, builder.and(newJoinFilters)))
+
+    val newJoin = join.copy(
+      join.getTraitSet,
+      newCondExp,
+      join.getLeft,
+      join.getRight,
+      join.getJoinType,
+      join.isSemiJoinDone)
+
+    call.transformTo(newJoin)
+  }
+
+  /**
+    * Returns true if two input refs of an equal call have different types in join condition,
+    * else false.
+    */
+  private def hasEqualsRefsOfDifferentTypes(
+      typeFactory: RelDataTypeFactory,
+      predicate: RexNode): Boolean = {
+    val conjunctions = RelOptUtil.conjunctions(predicate)
+    conjunctions.exists {
+      case c: RexCall if c.isA(SqlKind.EQUALS) =>
+        (c.operands.head, c.operands.last) match {
+          case (ref1: RexInputRef, ref2: RexInputRef) =>
+            !SqlTypeUtil.equalSansNullability(
+              typeFactory,
+              ref1.getType,
+              ref2.getType)
+          case _ => false
+        }
+      case _ => false
+    }
+  }
+}
+
+object JoinConditionTypeCoerceRule {
+  val INSTANCE = new JoinConditionTypeCoerceRule
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/JoinDependentConditionDerivationRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/JoinDependentConditionDerivationRule.scala
new file mode 100644
index 0000000..2222991
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/JoinDependentConditionDerivationRule.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.table.plan.util.FlinkRexUtil
+
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptUtil}
+import org.apache.calcite.rel.core.{Join, JoinRelType}
+import org.apache.calcite.rel.logical.LogicalJoin
+import org.apache.calcite.rex.RexNode
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+
+/**
+  * Planner Rule that extracts some sub-conditions in the Join OR condition that can be pushed
+  * into join inputs by [[FlinkFilterJoinRule]].
+  *
+  * <p>For example, there is a join query (table A join table B):
+  * {{{
+  * SELECT * FROM A, B WHERE A.f1 = B.f1 AND ((A.f2 = 'aaa1' AND B.f2 = 'bbb1') OR
+  * (A.f2 = 'aaa2' AND B.f2 = 'bbb2'))
+  * }}}
+  *
+  * <p>Hence the query rewards optimizers that can analyze complex join conditions which cannot be
+  * pushed below the join, but still derive filters from such join conditions. It could immediately
+  * filter the scan(A) with the condition: (A.f2 = 'aaa1' OR A.f2 = 'aaa2').
+  *
+  * <p>After join condition dependent optimization, the query will be:
+  * {{{
+  * SELECT * FROM A, B WHERE A.f1 = B.f1 AND
+  * ((A.f2 = 'aaa1' AND B.f2 = 'bbb1') OR (A.f2 = 'aaa2' AND B.f2 = 'bbb2'))
+  * AND (A.f2 = 'aaa1' OR A.f2 = 'aaa2') AND (B.f2 = 'bbb1' OR B.f2 = 'bbb2')
+  * }}}.
+  *
+  * <p>Note: This class can only be used in HepPlanner with RULE_SEQUENCE.
+  */
+class JoinDependentConditionDerivationRule
+  extends RelOptRule(
+    operand(classOf[LogicalJoin], any()),
+    "JoinDependentConditionDerivationRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val join: LogicalJoin = call.rel(0)
+    // TODO supports more join type
+    join.getJoinType == JoinRelType.INNER
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+    val join: LogicalJoin = call.rel(0)
+    val conjunctions = RelOptUtil.conjunctions(join.getCondition)
+
+    val builder = call.builder()
+    val additionalConditions = new mutable.ArrayBuffer[RexNode]
+
+    // and
+    conjunctions.foreach { conjunctionRex =>
+      val disjunctions = RelOptUtil.disjunctions(conjunctionRex)
+
+      // Apart or: (A.f2 = 'aaa1' and B.f2 = 'bbb1') or (A.f2 = 'aaa2' and B.f2 = 'bbb2')
+      if (disjunctions.size() > 1) {
+
+        val leftDisjunctions = new mutable.ArrayBuffer[RexNode]
+        val rightDisjunctions = new mutable.ArrayBuffer[RexNode]
+        disjunctions.foreach { disjunctionRex =>
+
+          val leftConjunctions = new mutable.ArrayBuffer[RexNode]
+          val rightConjunctions = new mutable.ArrayBuffer[RexNode]
+
+          // Apart and: A.f2 = 'aaa1' and B.f2 = 'bbb1'
+          RelOptUtil.conjunctions(disjunctionRex).foreach { cond =>
+            val rCols = RelOptUtil.InputFinder.bits(cond).map(_.intValue())
+
+            // May have multi conditions, eg: A.f2 = 'aaa1' and A.f3 = 'aaa3' and B.f2 = 'bbb1'
+            if (rCols.forall(fromJoinLeft(join, _))) {
+              leftConjunctions += cond
+            } else if (rCols.forall(!fromJoinLeft(join, _))) {
+              rightConjunctions += cond
+            }
+          }
+
+          // It is true if conjunction conditions is empty.
+          leftDisjunctions += builder.and(leftConjunctions)
+          rightDisjunctions += builder.and(rightConjunctions)
+        }
+
+        // TODO Consider whether it is worth doing a filter if we have histogram.
+        if (leftDisjunctions.nonEmpty) {
+          additionalConditions += builder.or(leftDisjunctions)
+        }
+        if (rightDisjunctions.nonEmpty) {
+          additionalConditions += builder.or(rightDisjunctions)
+        }
+      }
+    }
+
+    if (additionalConditions.nonEmpty) {
+      val newCondExp = FlinkRexUtil.simplify(
+        builder.getRexBuilder,
+        builder.and(conjunctions ++ additionalConditions))
+
+      if (!newCondExp.toString.equals(join.getCondition.toString)) {
+        val newJoin = join.copy(
+          join.getTraitSet,
+          newCondExp,
+          join.getLeft,
+          join.getRight,
+          join.getJoinType,
+          join.isSemiJoinDone)
+
+        call.transformTo(newJoin)
+      }
+    }
+  }
+
+  /**
+    * Returns true if the given index is from join left, else false.
+    */
+  private def fromJoinLeft(join: Join, index: Int): Boolean = {
+    require(join.getSystemFieldList.size() == 0)
+    index < join.getLeft.getRowType.getFieldCount
+  }
+
+}
+
+object JoinDependentConditionDerivationRule {
+  val INSTANCE = new JoinDependentConditionDerivationRule
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/JoinDeriveNullFilterRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/JoinDeriveNullFilterRule.scala
new file mode 100644
index 0000000..3143bdf
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/JoinDeriveNullFilterRule.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.table.api.PlannerConfigOptions
+import org.apache.flink.table.plan.metadata.FlinkRelMetadataQuery
+import org.apache.flink.table.plan.util.FlinkRelOptUtil
+
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.{Join, JoinRelType}
+import org.apache.calcite.rel.logical.LogicalJoin
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.util.ImmutableIntList
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+
+/**
+  * Planner rule that filters null values before join if the count null value from join input
+  * is greater than [[PlannerConfigOptions.SQL_OPTIMIZER_JOIN_NULL_FILTER_THRESHOLD]].
+  *
+  * Since the key of the Null value is impossible to match in the inner join, and there is a single
+  * point skew due to too many Null values. We should push down a not-null filter into the child
+  * node of join.
+  */
+class JoinDeriveNullFilterRule
+  extends RelOptRule(
+    operand(classOf[LogicalJoin], any()),
+    "JoinDeriveNullFilterRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val join: Join = call.rel(0)
+    join.getJoinType == JoinRelType.INNER && join.analyzeCondition.pairs().nonEmpty
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+    val join: LogicalJoin = call.rel(0)
+
+    val rexBuilder = join.getCluster.getRexBuilder
+    val mq = FlinkRelMetadataQuery.reuseOrCreate(join.getCluster.getMetadataQuery)
+    val conf = FlinkRelOptUtil.getTableConfigFromContext(join)
+    val minNullCount = conf.getConf.getLong(
+      PlannerConfigOptions.SQL_OPTIMIZER_JOIN_NULL_FILTER_THRESHOLD)
+
+    def createIsNotNullFilter(input: RelNode, keys: ImmutableIntList): RelNode = {
+      val relBuilder = call.builder()
+      val filters = new mutable.ArrayBuffer[RexNode]
+      keys.foreach { key =>
+        val nullCount = mq.getColumnNullCount(input, key)
+        if (nullCount != null && nullCount > minNullCount) {
+          filters += relBuilder.call(
+            SqlStdOperatorTable.IS_NOT_NULL, rexBuilder.makeInputRef(input, key))
+        }
+      }
+      if (filters.nonEmpty) {
+        relBuilder.push(input).filter(filters).build()
+      } else {
+        input
+      }
+    }
+
+    val joinInfo = join.analyzeCondition
+    val newLeft = createIsNotNullFilter(join.getLeft, joinInfo.leftKeys)
+    val newRight = createIsNotNullFilter(join.getRight, joinInfo.rightKeys)
+
+    if ((newLeft ne join.getLeft) || (newRight ne join.getRight)) {
+      val newJoin = join.copy(join.getTraitSet, Seq(newLeft, newRight))
+      call.transformTo(newJoin)
+    }
+  }
+}
+
+object JoinDeriveNullFilterRule {
+  val INSTANCE = new JoinDeriveNullFilterRule
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/SimplifyJoinConditionRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/SimplifyJoinConditionRule.scala
new file mode 100644
index 0000000..2eff11a
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/SimplifyJoinConditionRule.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.table.plan.util.FlinkRexUtil
+
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.logical.LogicalJoin
+import org.apache.calcite.rex._
+
+/**
+  * Planner rule that apply various simplifying transformations on join condition.
+  * e.g.
+  * reduce same expressions: a=b AND b=a -> a=b,
+  * simplify boolean expressions: x = 1 AND FALSE -> FALSE,
+  * simplify cast expressions: CAST('123' as integer) -> 123
+  */
+class SimplifyJoinConditionRule
+  extends RelOptRule(
+    operand(classOf[LogicalJoin], any()),
+    "SimplifyJoinConditionRule") {
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+    val join: LogicalJoin = call.rel(0)
+    val condition = join.getCondition
+
+    if (join.getCondition.isAlwaysTrue) {
+      return
+    }
+
+    val simpleCondExp = FlinkRexUtil.simplify(join.getCluster.getRexBuilder, condition)
+    val newCondExp = RexUtil.pullFactors(join.getCluster.getRexBuilder, simpleCondExp)
+
+    if (newCondExp.toString.equals(condition.toString)) {
+      return
+    }
+
+    val newJoin = join.copy(
+      join.getTraitSet,
+      newCondExp,
+      join.getLeft,
+      join.getRight,
+      join.getJoinType,
+      join.isSemiJoinDone)
+
+    call.transformTo(newJoin)
+    call.getPlanner.setImportance(join, 0.0)
+  }
+}
+
+object SimplifyJoinConditionRule {
+  val INSTANCE = new SimplifyJoinConditionRule
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/BroadcastHashSemiAntiJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/BroadcastHashSemiAntiJoinTest.xml
index 789b29d..a2bb487 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/BroadcastHashSemiAntiJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/BroadcastHashSemiAntiJoinTest.xml
@@ -863,13 +863,12 @@ LogicalProject(d=[$0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, b, c])
-+- HashJoin(joinType=[LeftSemiJoin], where=[AND(=(a, d), =(b0, d))], select=[a, b, c, b0], isBroadcast=[true], build=[right])
-   :- Calc(select=[a, b, c, CAST(b) AS b0])
-   :  +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-   +- Exchange(distribution=[broadcast])
-      +- Calc(select=[d])
-         +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+HashJoin(joinType=[LeftSemiJoin], where=[=(a, d)], select=[a, b, c], isBroadcast=[true], build=[right])
+:- Calc(select=[a, b, c], where=[=(a, CAST(b))])
+:  +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
++- Exchange(distribution=[broadcast])
+   +- Calc(select=[d])
+      +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/NestedLoopJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/NestedLoopJoinTest.xml
index a0d0dc3..aefff81 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/NestedLoopJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/NestedLoopJoinTest.xml
@@ -179,6 +179,33 @@ NestedLoopJoin(joinType=[FullOuterJoin], where=[<>(a, d)], select=[d, e, f, g, h
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testFullOuterWithUsing">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM (SELECT * FROM MyTable1) FULL JOIN (SELECT * FROM MyTable3) USING (a)
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[COALESCE($0, $3)], b=[$1], c=[$2], b0=[$4], c0=[$5])
++- LogicalJoin(condition=[=($0, $3)], joinType=[full])
+   :- LogicalProject(a=[$0], b=[$1], c=[$2])
+   :  +- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2])
+      +- LogicalTableScan(table=[[MyTable3, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[CASE(IS NOT NULL(a), a, a0) AS $f0, b, c, b0, c0])
++- NestedLoopJoin(joinType=[FullOuterJoin], where=[=(a, a0)], select=[a, b, c, a0, b0, c0], build=[left])
+   :- Exchange(distribution=[single])
+   :  +- TableSourceScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+   +- Exchange(distribution=[single])
+      +- TableSourceScan(table=[[MyTable3, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testInnerJoinWithEquiAndLocalPred">
     <Resource name="sql">
       <![CDATA[SELECT c, g FROM MyTable2 INNER JOIN MyTable1 ON a = d AND d < 2]]>
@@ -371,31 +398,6 @@ NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[a, d], build=[left])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testJoinNonMatchingKeyTypes">
-    <Resource name="sql">
-      <![CDATA[SELECT c, g FROM MyTable1, MyTable2 WHERE a = g]]>
-    </Resource>
-    <Resource name="planBefore">
-      <![CDATA[
-LogicalProject(c=[$2], g=[$6])
-+- LogicalFilter(condition=[=($0, $6)])
-   +- LogicalJoin(condition=[true], joinType=[inner])
-      :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
-      +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
-]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-Calc(select=[c, g])
-+- NestedLoopJoin(joinType=[InnerJoin], where=[=(a, g)], select=[a, c, g], build=[right])
-   :- Calc(select=[a, c])
-   :  +- TableSourceScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-   +- Exchange(distribution=[broadcast])
-      +- Calc(select=[g])
-         +- TableSourceScan(table=[[MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
-]]>
-    </Resource>
-  </TestCase>
   <TestCase name="testLeftOuterJoinNoEquiPred">
     <Resource name="sql">
       <![CDATA[SELECT * FROM MyTable2 LEFT OUTER JOIN MyTable1 ON a <> d]]>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/NestedLoopSemiAntiJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/NestedLoopSemiAntiJoinTest.xml
index 09a0420..f825d25 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/NestedLoopSemiAntiJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/NestedLoopSemiAntiJoinTest.xml
@@ -1053,13 +1053,12 @@ LogicalProject(d=[$0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, b, c])
-+- NestedLoopJoin(joinType=[LeftSemiJoin], where=[AND(=(a, d), =(b0, d))], select=[a, b, c, b0], build=[right])
-   :- Calc(select=[a, b, c, CAST(b) AS b0])
-   :  +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-   +- Exchange(distribution=[broadcast])
-      +- Calc(select=[d])
-         +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+NestedLoopJoin(joinType=[LeftSemiJoin], where=[=(a, d)], select=[a, b, c], build=[right])
+:- Calc(select=[a, b, c], where=[=(a, CAST(b))])
+:  +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
++- Exchange(distribution=[broadcast])
+   +- Calc(select=[d])
+      +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
 ]]>
     </Resource>
   </TestCase>
@@ -2011,13 +2010,14 @@ LogicalFilter(condition=[<($cor0.a, 10)])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-NestedLoopJoin(joinType=[LeftAntiJoin], where=[true], select=[a, b, c], build=[right])
-:- Calc(select=[a, b, c], where=[<(a, 10)])
-:  +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-+- Exchange(distribution=[broadcast])
-   +- LocalHashAggregate(select=[])
-      +- Calc(select=[])
-         +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+Calc(select=[a, b, c])
++- NestedLoopJoin(joinType=[LeftAntiJoin], where=[$f3], select=[a, b, c, $f3], build=[right])
+   :- Calc(select=[a, b, c, <(a, 10) AS $f3])
+   :  +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+   +- Exchange(distribution=[broadcast])
+      +- LocalHashAggregate(select=[])
+         +- Calc(select=[])
+            +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
 ]]>
     </Resource>
   </TestCase>
@@ -2348,12 +2348,13 @@ LogicalProject(d=[$0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-NestedLoopJoin(joinType=[LeftAntiJoin], where=[OR(=(a, d), IS NULL(a), IS NULL(d))], select=[a, b, c], build=[right])
-:- Calc(select=[a, b, c], where=[>(b, 10)])
-:  +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-+- Exchange(distribution=[broadcast])
-   +- Calc(select=[d])
-      +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+Calc(select=[a, b, c])
++- NestedLoopJoin(joinType=[LeftAntiJoin], where=[AND(OR(=(a, d), IS NULL(a), IS NULL(d)), $f3)], select=[a, b, c, $f3], build=[right])
+   :- Calc(select=[a, b, c, >(b, 10) AS $f3])
+   :  +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+   +- Exchange(distribution=[broadcast])
+      +- Calc(select=[d])
+         +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/SemiAntiJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/SemiAntiJoinTest.xml
index e8efee0..117d880 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/SemiAntiJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/SemiAntiJoinTest.xml
@@ -1082,14 +1082,13 @@ LogicalProject(d=[$0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, b, c])
-+- HashJoin(joinType=[LeftSemiJoin], where=[AND(=(a, d), =(b0, d))], select=[a, b, c, b0], build=[right])
-   :- Exchange(distribution=[hash[a, b0]])
-   :  +- Calc(select=[a, b, c, CAST(b) AS b0])
-   :     +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-   +- Exchange(distribution=[hash[d, d]])
-      +- Calc(select=[d])
-         +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+HashJoin(joinType=[LeftSemiJoin], where=[=(a, d)], select=[a, b, c], build=[left])
+:- Exchange(distribution=[hash[a]])
+:  +- Calc(select=[a, b, c], where=[=(a, CAST(b))])
+:     +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
++- Exchange(distribution=[hash[d]])
+   +- Calc(select=[d])
+      +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
 ]]>
     </Resource>
   </TestCase>
@@ -2073,13 +2072,14 @@ LogicalFilter(condition=[<($cor0.a, 10)])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-NestedLoopJoin(joinType=[LeftAntiJoin], where=[true], select=[a, b, c], build=[right])
-:- Calc(select=[a, b, c], where=[<(a, 10)])
-:  +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-+- Exchange(distribution=[broadcast])
-   +- LocalHashAggregate(select=[])
-      +- Calc(select=[])
-         +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+Calc(select=[a, b, c])
++- NestedLoopJoin(joinType=[LeftAntiJoin], where=[$f3], select=[a, b, c, $f3], build=[right])
+   :- Calc(select=[a, b, c, <(a, 10) AS $f3])
+   :  +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+   +- Exchange(distribution=[broadcast])
+      +- LocalHashAggregate(select=[])
+         +- Calc(select=[])
+            +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
 ]]>
     </Resource>
   </TestCase>
@@ -2419,12 +2419,13 @@ LogicalProject(d=[$0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-NestedLoopJoin(joinType=[LeftAntiJoin], where=[OR(=(a, d), IS NULL(a), IS NULL(d))], select=[a, b, c], build=[right])
-:- Calc(select=[a, b, c], where=[>(b, 10)])
-:  +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-+- Exchange(distribution=[broadcast])
-   +- Calc(select=[d])
-      +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+Calc(select=[a, b, c])
++- NestedLoopJoin(joinType=[LeftAntiJoin], where=[AND(OR(=(a, d), IS NULL(a), IS NULL(d)), $f3)], select=[a, b, c, $f3], build=[right])
+   :- Calc(select=[a, b, c, >(b, 10) AS $f3])
+   :  +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+   +- Exchange(distribution=[broadcast])
+      +- Calc(select=[d])
+         +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/ShuffledHashJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/ShuffledHashJoinTest.xml
index 4be436f..c687cc5 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/ShuffledHashJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/ShuffledHashJoinTest.xml
@@ -92,6 +92,33 @@ Calc(select=[c, g])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testFullOuterWithUsing">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM (SELECT * FROM MyTable1) FULL JOIN (SELECT * FROM MyTable3) USING (a)
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[COALESCE($0, $3)], b=[$1], c=[$2], b0=[$4], c0=[$5])
++- LogicalJoin(condition=[=($0, $3)], joinType=[full])
+   :- LogicalProject(a=[$0], b=[$1], c=[$2])
+   :  +- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2])
+      +- LogicalTableScan(table=[[MyTable3, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[CASE(IS NOT NULL(a), a, a0) AS $f0, b, c, b0, c0])
++- HashJoin(joinType=[FullOuterJoin], where=[=(a, a0)], select=[a, b, c, a0, b0, c0], build=[right])
+   :- Exchange(distribution=[hash[a]])
+   :  +- TableSourceScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+   +- Exchange(distribution=[hash[a]])
+      +- TableSourceScan(table=[[MyTable3, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testInnerJoinWithEquiAndLocalPred">
     <Resource name="sql">
       <![CDATA[SELECT c, g FROM MyTable2 INNER JOIN MyTable1 ON a = d AND d < 2]]>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/ShuffledHashSemiAntiJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/ShuffledHashSemiAntiJoinTest.xml
index bd57fc8..9612610 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/ShuffledHashSemiAntiJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/ShuffledHashSemiAntiJoinTest.xml
@@ -872,14 +872,13 @@ LogicalProject(d=[$0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, b, c])
-+- HashJoin(joinType=[LeftSemiJoin], where=[AND(=(a, d), =(b0, d))], select=[a, b, c, b0], build=[right])
-   :- Exchange(distribution=[hash[a, b0]])
-   :  +- Calc(select=[a, b, c, CAST(b) AS b0])
-   :     +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-   +- Exchange(distribution=[hash[d, d]])
-      +- Calc(select=[d])
-         +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+HashJoin(joinType=[LeftSemiJoin], where=[=(a, d)], select=[a, b, c], build=[left])
+:- Exchange(distribution=[hash[a]])
+:  +- Calc(select=[a, b, c], where=[=(a, CAST(b))])
+:     +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
++- Exchange(distribution=[hash[d]])
+   +- Calc(select=[d])
+      +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/SortMergeJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/SortMergeJoinTest.xml
index d21ac60..51724cb 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/SortMergeJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/SortMergeJoinTest.xml
@@ -92,6 +92,33 @@ Calc(select=[c, g])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testFullOuterWithUsing">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM (SELECT * FROM MyTable1) FULL JOIN (SELECT * FROM MyTable3) USING (a)
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[COALESCE($0, $3)], b=[$1], c=[$2], b0=[$4], c0=[$5])
++- LogicalJoin(condition=[=($0, $3)], joinType=[full])
+   :- LogicalProject(a=[$0], b=[$1], c=[$2])
+   :  +- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2])
+      +- LogicalTableScan(table=[[MyTable3, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[CASE(IS NOT NULL(a), a, a0) AS $f0, b, c, b0, c0])
++- SortMergeJoin(joinType=[FullOuterJoin], where=[=(a, a0)], select=[a, b, c, a0, b0, c0])
+   :- Exchange(distribution=[hash[a]])
+   :  +- TableSourceScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+   +- Exchange(distribution=[hash[a]])
+      +- TableSourceScan(table=[[MyTable3, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testInnerJoinWithEquiAndLocalPred">
     <Resource name="sql">
       <![CDATA[SELECT c, g FROM MyTable2 INNER JOIN MyTable1 ON a = d AND d < 2]]>
@@ -268,32 +295,6 @@ Calc(select=[a, d])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testJoinNonMatchingKeyTypes">
-    <Resource name="sql">
-      <![CDATA[SELECT c, g FROM MyTable1, MyTable2 WHERE a = g]]>
-    </Resource>
-    <Resource name="planBefore">
-      <![CDATA[
-LogicalProject(c=[$2], g=[$6])
-+- LogicalFilter(condition=[=($0, $6)])
-   +- LogicalJoin(condition=[true], joinType=[inner])
-      :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
-      +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
-]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-Calc(select=[c, g])
-+- SortMergeJoin(joinType=[InnerJoin], where=[=(a, g)], select=[a, c, g])
-   :- Exchange(distribution=[hash[a]])
-   :  +- Calc(select=[a, c])
-   :     +- TableSourceScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-   +- Exchange(distribution=[hash[g]])
-      +- Calc(select=[g])
-         +- TableSourceScan(table=[[MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
-]]>
-    </Resource>
-  </TestCase>
   <TestCase name="testLeftOuterJoinWithEquiAndLocalPred">
     <Resource name="sql">
       <![CDATA[SELECT c, g FROM MyTable2 LEFT OUTER JOIN  MyTable1 ON a = d AND d < 2]]>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/SortMergeSemiAntiJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/SortMergeSemiAntiJoinTest.xml
index cba2428..7e427ac 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/SortMergeSemiAntiJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/SortMergeSemiAntiJoinTest.xml
@@ -929,14 +929,13 @@ LogicalProject(d=[$0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, b, c])
-+- SortMergeJoin(joinType=[LeftSemiJoin], where=[AND(=(a, d), =(b0, d))], select=[a, b, c, b0])
-   :- Exchange(distribution=[hash[a, b0]])
-   :  +- Calc(select=[a, b, c, CAST(b) AS b0])
-   :     +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-   +- Exchange(distribution=[hash[d, d]])
-      +- Calc(select=[d])
-         +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+SortMergeJoin(joinType=[LeftSemiJoin], where=[=(a, d)], select=[a, b, c])
+:- Exchange(distribution=[hash[a]])
+:  +- Calc(select=[a, b, c], where=[=(a, CAST(b))])
+:     +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
++- Exchange(distribution=[hash[d]])
+   +- Calc(select=[d])
+      +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/FlinkFilterJoinRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/FlinkFilterJoinRuleTest.xml
new file mode 100644
index 0000000..34f742f
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/FlinkFilterJoinRuleTest.xml
@@ -0,0 +1,435 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testFilterPushDownLeftAnti1">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM (SELECT * FROM leftT WHERE a NOT IN
+    (SELECT c FROM rightT WHERE c < 3)) T WHERE T.b > 2
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalFilter(condition=[>($1, 2)])
+   +- LogicalProject(a=[$0], b=[$1])
+      +- LogicalFilter(condition=[NOT(IN($0, {
+LogicalProject(c=[$0])
+  LogicalFilter(condition=[<($0, 3)])
+    LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+}))])
+         +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalProject(a=[$0], b=[$1])
+   +- LogicalJoin(condition=[OR(=($0, $2), IS NULL($0), IS NULL($2))], joinType=[anti])
+      :- LogicalFilter(condition=[>($1, 2)])
+      :  +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+      +- LogicalProject(c=[$0])
+         +- LogicalFilter(condition=[<($0, 3)])
+            +- LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testFilterPushDownLeftAnti2">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM (SELECT * FROM leftT WHERE NOT EXISTS
+    (SELECT * FROM rightT where c > 10)) T WHERE T.b > 2
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalFilter(condition=[>($1, 2)])
+   +- LogicalProject(a=[$0], b=[$1])
+      +- LogicalFilter(condition=[NOT(EXISTS({
+LogicalFilter(condition=[>($0, 10)])
+  LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+}))])
+         +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalProject(a=[$0], b=[$1])
+   +- LogicalJoin(condition=[$2], joinType=[anti])
+      :- LogicalFilter(condition=[>($1, 2)])
+      :  +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+      +- LogicalProject($f0=[IS NOT NULL($0)])
+         +- LogicalAggregate(group=[{}], m=[MIN($0)])
+            +- LogicalProject(i=[true])
+               +- LogicalFilter(condition=[>($0, 10)])
+                  +- LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testFilterPushDownLeftAnti3">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM (SELECT * FROM leftT WHERE a NOT IN
+(SELECT c FROM rightT WHERE b = d AND c < 3)) T WHERE T.b > 2
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalFilter(condition=[>($1, 2)])
+   +- LogicalProject(a=[$0], b=[$1])
+      +- LogicalFilter(condition=[NOT(IN($0, {
+LogicalProject(c=[$0])
+  LogicalFilter(condition=[AND(=($cor0.b, $1), <($0, 3))])
+    LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+}))], variablesSet=[[$cor0]])
+         +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalProject(a=[$0], b=[$1])
+   +- LogicalJoin(condition=[AND(OR(=($0, $2), IS NULL($0), IS NULL($2)), =($1, $3))], joinType=[anti])
+      :- LogicalFilter(condition=[>($1, 2)])
+      :  +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+      +- LogicalProject(c=[$0], d=[$1])
+         +- LogicalFilter(condition=[<($0, 3)])
+            +- LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testFilterPushDownLeftAnti4">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM (SELECT * FROM leftT WHERE NOT EXISTS
+    (SELECT * FROM rightT WHERE a = c)) T WHERE T.b > 2
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalFilter(condition=[>($1, 2)])
+   +- LogicalProject(a=[$0], b=[$1])
+      +- LogicalFilter(condition=[NOT(EXISTS({
+LogicalFilter(condition=[=($cor0.a, $0)])
+  LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+}))], variablesSet=[[$cor0]])
+         +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalProject(a=[$0], b=[$1])
+   +- LogicalJoin(condition=[=($0, $2)], joinType=[anti])
+      :- LogicalFilter(condition=[>($1, 2)])
+      :  +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+      +- LogicalProject(c=[$0])
+         +- LogicalFilter(condition=[true])
+            +- LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testFilterPushDownLeftSemi1">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM (SELECT * FROM leftT WHERE a IN (SELECT c FROM rightT)) T WHERE T.b > 2]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalFilter(condition=[>($1, 2)])
+   +- LogicalProject(a=[$0], b=[$1])
+      +- LogicalFilter(condition=[IN($0, {
+LogicalProject(c=[$0])
+  LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+})])
+         +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalProject(a=[$0], b=[$1])
+   +- LogicalJoin(condition=[=($0, $2)], joinType=[semi])
+      :- LogicalFilter(condition=[>($1, 2)])
+      :  +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+      +- LogicalProject(c=[$0])
+         +- LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testFilterPushDownLeftSemi2">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM (SELECT * FROM leftT WHERE EXISTS (SELECT * FROM rightT)) T WHERE T.b > 2]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalFilter(condition=[>($1, 2)])
+   +- LogicalProject(a=[$0], b=[$1])
+      +- LogicalFilter(condition=[EXISTS({
+LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+})])
+         +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalProject(a=[$0], b=[$1])
+   +- LogicalJoin(condition=[$2], joinType=[semi])
+      :- LogicalFilter(condition=[>($1, 2)])
+      :  +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+      +- LogicalProject($f0=[IS NOT NULL($0)])
+         +- LogicalAggregate(group=[{}], m=[MIN($0)])
+            +- LogicalProject(i=[true])
+               +- LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testFilterPushDownLeftSemi3">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM (SELECT * FROM leftT WHERE EXISTS
+    (SELECT * FROM rightT WHERE a = c)) T WHERE T.b > 2
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalFilter(condition=[>($1, 2)])
+   +- LogicalProject(a=[$0], b=[$1])
+      +- LogicalFilter(condition=[EXISTS({
+LogicalFilter(condition=[=($cor0.a, $0)])
+  LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+})], variablesSet=[[$cor0]])
+         +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalProject(a=[$0], b=[$1])
+   +- LogicalJoin(condition=[=($0, $2)], joinType=[semi])
+      :- LogicalFilter(condition=[>($1, 2)])
+      :  +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+      +- LogicalProject(c=[$0])
+         +- LogicalFilter(condition=[true])
+            +- LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinConditionPushDownLeftAnti1">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM leftT WHERE a NOT IN (SELECT c FROM rightT WHERE b > 2)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalFilter(condition=[NOT(IN($0, {
+LogicalProject(c=[$0])
+  LogicalFilter(condition=[>($cor0.b, 2)])
+    LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+}))], variablesSet=[[$cor0]])
+   +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalProject(a=[$0], b=[$1])
+   +- LogicalJoin(condition=[AND(OR(=($0, $3), IS NULL($0), IS NULL($3)), $2)], joinType=[anti])
+      :- LogicalProject(a=[$0], b=[$1], $f2=[>($1, 2)])
+      :  +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+      +- LogicalProject(c=[$0])
+         +- LogicalFilter(condition=[true])
+            +- LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinConditionPushDownLeftAnti2">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM leftT WHERE NOT EXISTS (SELECT * FROM rightT WHERE b > 2)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalFilter(condition=[NOT(EXISTS({
+LogicalFilter(condition=[>($cor0.b, 2)])
+  LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+}))], variablesSet=[[$cor0]])
+   +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalProject(a=[$0], b=[$1])
+   +- LogicalJoin(condition=[$2], joinType=[anti])
+      :- LogicalProject(a=[$0], b=[$1], $f2=[>($1, 2)])
+      :  +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+      +- LogicalProject
+         +- LogicalFilter(condition=[true])
+            +- LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinConditionPushDownLeftAnti3">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM leftT WHERE a NOT IN (SELECT c FROM rightT WHERE b = d AND b > 1)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalFilter(condition=[NOT(IN($0, {
+LogicalProject(c=[$0])
+  LogicalFilter(condition=[AND(=($cor0.b, $1), >($cor0.b, 1))])
+    LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+}))], variablesSet=[[$cor0]])
+   +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalProject(a=[$0], b=[$1])
+   +- LogicalJoin(condition=[AND(OR(=($0, $3), IS NULL($0), IS NULL($3)), =($1, $4), $2)], joinType=[anti])
+      :- LogicalProject(a=[$0], b=[$1], $f2=[>($1, 1)])
+      :  +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+      +- LogicalProject(c=[$0], d=[$1])
+         +- LogicalFilter(condition=[true])
+            +- LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinConditionPushDownLeftAnti4">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM leftT WHERE NOT EXISTS (SELECT * FROM rightT WHERE a = c AND b > 2)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalFilter(condition=[NOT(EXISTS({
+LogicalFilter(condition=[AND(=($cor0.a, $0), >($cor0.b, 2))])
+  LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+}))], variablesSet=[[$cor0]])
+   +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalProject(a=[$0], b=[$1])
+   +- LogicalJoin(condition=[AND(=($0, $3), $2)], joinType=[anti])
+      :- LogicalProject(a=[$0], b=[$1], $f2=[>($1, 2)])
+      :  +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+      +- LogicalProject(c=[$0])
+         +- LogicalFilter(condition=[true])
+            +- LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinConditionPushDownLeftSemi1">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM leftT WHERE a IN (SELECT c FROM rightT WHERE b > 2)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(c=[$0])
+  LogicalFilter(condition=[>($cor0.b, 2)])
+    LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+})], variablesSet=[[$cor0]])
+   +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalProject(a=[$0], b=[$1])
+   +- LogicalJoin(condition=[=($0, $3)], joinType=[semi])
+      :- LogicalProject(a=[$0], b=[$1], $f2=[>($1, 2)])
+      :  +- LogicalFilter(condition=[>($1, 2)])
+      :     +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+      +- LogicalProject(c=[$0])
+         +- LogicalFilter(condition=[true])
+            +- LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinConditionPushDownLeftSemi2">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM leftT WHERE EXISTS (SELECT * FROM rightT WHERE b > 2)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalFilter(condition=[EXISTS({
+LogicalFilter(condition=[>($cor0.b, 2)])
+  LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+})], variablesSet=[[$cor0]])
+   +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalProject(a=[$0], b=[$1])
+   +- LogicalJoin(condition=[true], joinType=[semi])
+      :- LogicalProject(a=[$0], b=[$1], $f2=[>($1, 2)])
+      :  +- LogicalFilter(condition=[>($1, 2)])
+      :     +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+      +- LogicalProject
+         +- LogicalFilter(condition=[true])
+            +- LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinConditionPushDownLeftSemi3">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM leftT WHERE EXISTS (SELECT * FROM rightT WHERE a = c AND b > 2)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalFilter(condition=[EXISTS({
+LogicalFilter(condition=[AND(=($cor0.a, $0), >($cor0.b, 2))])
+  LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+})], variablesSet=[[$cor0]])
+   +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalProject(a=[$0], b=[$1])
+   +- LogicalJoin(condition=[=($0, $3)], joinType=[semi])
+      :- LogicalProject(a=[$0], b=[$1], $f2=[>($1, 2)])
+      :  +- LogicalFilter(condition=[>($1, 2)])
+      :     +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+      +- LogicalProject(c=[$0])
+         +- LogicalFilter(condition=[true])
+            +- LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+]]>
+    </Resource>
+  </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/JoinConditionEqualityTransferRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/JoinConditionEqualityTransferRuleTest.xml
new file mode 100644
index 0000000..04b5bb2
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/JoinConditionEqualityTransferRuleTest.xml
@@ -0,0 +1,361 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testInnerJoin1">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable1 JOIN MyTable2 ON a = d AND a = e]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalJoin(condition=[AND(=($0, $3), =($0, $4))], joinType=[inner])
+   :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+   +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalJoin(condition=[AND(=($3, $4), =($0, $3))], joinType=[inner])
+   :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+   +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInnerJoin2">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable1 JOIN MyTable2 ON a = d AND a = e AND b = d]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalJoin(condition=[AND(=($0, $3), =($0, $4), =($1, $3))], joinType=[inner])
+   :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+   +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalJoin(condition=[AND(=($0, $1), =($3, $4), =($0, $3))], joinType=[inner])
+   :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+   +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInnerJoin3">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable1 JOIN MyTable2 ON a = d AND a = e AND a = c]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalJoin(condition=[AND(=($0, $3), =($0, $4), =($0, $2))], joinType=[inner])
+   :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+   +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalJoin(condition=[AND(=($0, $2), =($3, $4), =($0, $3))], joinType=[inner])
+   :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+   +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInnerJoin4">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable1 JOIN MyTable2 ON a = d AND a = e AND b + 1 = d]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$4], e=[$5], f=[$6])
++- LogicalJoin(condition=[AND(=($0, $4), =($0, $5), =($3, $4))], joinType=[inner])
+   :- LogicalProject(a=[$0], b=[$1], c=[$2], $f3=[+($1, 1)])
+   :  +- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+   +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$4], e=[$5], f=[$6])
++- LogicalJoin(condition=[AND(=($0, $3), =($5, $4), =($0, $5))], joinType=[inner])
+   :- LogicalProject(a=[$0], b=[$1], c=[$2], $f3=[+($1, 1)])
+   :  +- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+   +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInnerJoinWithNonEquiCondition1">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable1 JOIN MyTable2 ON a = d AND a > e]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalJoin(condition=[AND(=($0, $3), >($0, $4))], joinType=[inner])
+   :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+   +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalJoin(condition=[AND(=($0, $3), >($0, $4))], joinType=[inner])
+   :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+   +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInnerJoinWithNonEquiCondition2">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable1 JOIN MyTable2 ON a = d AND a = e AND b > d]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalJoin(condition=[AND(=($0, $3), =($0, $4), >($1, $3))], joinType=[inner])
+   :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+   +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalJoin(condition=[AND(>($1, $3), =($3, $4), =($0, $3))], joinType=[inner])
+   :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+   +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSemiJoin_Exist1">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM MyTable1 WHERE EXISTS (SELECT * FROM MyTable2 WHERE a = d AND a = e)
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[EXISTS({
+LogicalFilter(condition=[AND(=($cor0.a, $0), =($cor0.a, $1))])
+  LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+})], variablesSet=[[$cor0]])
+   +- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[AND(=($3, $4), =($0, $3))], joinType=[semi])
+   :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+   +- LogicalProject(d=[$0], e=[$1])
+      +- LogicalFilter(condition=[true])
+         +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSemiJoin_Exist2">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM MyTable1 WHERE EXISTS
+    (SELECT * FROM MyTable2 WHERE a = d AND a = e AND b = d)
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[EXISTS({
+LogicalFilter(condition=[AND(=($cor0.a, $0), =($cor0.a, $1), =($cor0.b, $0))])
+  LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+})], variablesSet=[[$cor0]])
+   +- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[AND(=($0, $1), =($3, $4), =($0, $3))], joinType=[semi])
+   :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+   +- LogicalProject(d=[$0], e=[$1])
+      +- LogicalFilter(condition=[true])
+         +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSemiJoin_In1">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable1 WHERE a IN (SELECT d FROM MyTable2 WHERE a = e)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(d=[$0])
+  LogicalFilter(condition=[=($cor0.a, $1)])
+    LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+})], variablesSet=[[$cor0]])
+   +- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[AND(=($3, $4), =($0, $3))], joinType=[semi])
+   :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+   +- LogicalProject(d=[$0], e=[$1])
+      +- LogicalFilter(condition=[true])
+         +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSemiJoin_In2">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable1 WHERE a IN (SELECT d FROM MyTable2 WHERE a = e AND b = d)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(d=[$0])
+  LogicalFilter(condition=[AND(=($cor0.a, $1), =($cor0.b, $0))])
+    LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+})], variablesSet=[[$cor0]])
+   +- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[AND(=($0, $1), =($3, $4), =($0, $3))], joinType=[semi])
+   :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+   +- LogicalProject(d=[$0], e=[$1])
+      +- LogicalFilter(condition=[true])
+         +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSemiJoinWithNonEquiCondition_Exist1">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM MyTable1 WHERE EXISTS (SELECT * FROM MyTable2 WHERE a = d AND a > e)
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[EXISTS({
+LogicalFilter(condition=[AND(=($cor0.a, $0), >($cor0.a, $1))])
+  LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+})], variablesSet=[[$cor0]])
+   +- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[AND(=($0, $3), >($0, $4))], joinType=[semi])
+   :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+   +- LogicalProject(d=[$0], e=[$1])
+      +- LogicalFilter(condition=[true])
+         +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSemiJoinWithNonEquiCondition_Exist2">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM MyTable1 WHERE EXISTS
+    (SELECT * FROM MyTable2 WHERE a = d AND a = e AND b > d)
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[EXISTS({
+LogicalFilter(condition=[AND(=($cor0.a, $0), =($cor0.a, $1), >($cor0.b, $0))])
+  LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+})], variablesSet=[[$cor0]])
+   +- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[AND(>($1, $3), =($3, $4), =($0, $3))], joinType=[semi])
+   :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+   +- LogicalProject(d=[$0], e=[$1])
+      +- LogicalFilter(condition=[true])
+         +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSemiJoinWithNonEquiCondition_In1">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable1 WHERE a IN (SELECT d FROM MyTable2 WHERE a > e)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(d=[$0])
+  LogicalFilter(condition=[>($cor0.a, $1)])
+    LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+})], variablesSet=[[$cor0]])
+   +- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[AND(=($0, $3), >($0, $4))], joinType=[semi])
+   :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+   +- LogicalProject(d=[$0], e=[$1])
+      +- LogicalFilter(condition=[true])
+         +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSemiJoinWithNonEquiCondition_In2">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable1 WHERE a IN (SELECT d FROM MyTable2 WHERE a > e AND b = d)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(d=[$0])
+  LogicalFilter(condition=[AND(>($cor0.a, $1), =($cor0.b, $0))])
+    LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+})], variablesSet=[[$cor0]])
+   +- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[AND(>($0, $4), =($0, $1), =($0, $3))], joinType=[semi])
+   :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+   +- LogicalProject(d=[$0], e=[$1])
+      +- LogicalFilter(condition=[true])
+         +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/JoinConditionTypeCoerceRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/JoinConditionTypeCoerceRuleTest.xml
new file mode 100644
index 0000000..9dfc80e
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/JoinConditionTypeCoerceRuleTest.xml
@@ -0,0 +1,266 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testInToSemiJoinDoubleEqualsDecimal">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM T1 WHERE T1.d IN (SELECT e FROM T2)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
++- LogicalFilter(condition=[IN($3, {
+LogicalProject(e=[$4])
+  LogicalTableScan(table=[[T2, source: [TestTableSource(a, b, c, d, e)]]])
+})]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+   +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
++- LogicalJoin(condition=[=($3, $6)], joinType=[semi]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+   :- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+   +- LogicalProject(e=[$0], e0=[CAST($0):DOUBLE]), rowType=[RecordType(DECIMAL(38, 18) e, DOUBLE e0)]
+      +- LogicalProject(e=[$4]), rowType=[RecordType(DECIMAL(38, 18) e)]
+         +- LogicalTableScan(table=[[T2, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInToSemiJoinFloatEqualsDecimal">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM T1 WHERE T1.c IN (SELECT e FROM T2)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
++- LogicalFilter(condition=[IN($2, {
+LogicalProject(e=[$4])
+  LogicalTableScan(table=[[T2, source: [TestTableSource(a, b, c, d, e)]]])
+})]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+   +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
++- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+   +- LogicalJoin(condition=[=($5, $7)], joinType=[semi]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e, DOUBLE c0)]
+      :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], c0=[CAST($2):DOUBLE]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e, DOUBLE c0)]
+      :  +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+      +- LogicalProject(e=[$0], e0=[CAST($0):DOUBLE]), rowType=[RecordType(DECIMAL(38, 18) e, DOUBLE e0)]
+         +- LogicalProject(e=[$4]), rowType=[RecordType(DECIMAL(38, 18) e)]
+            +- LogicalTableScan(table=[[T2, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInToSemiJoinIntEqualsDecimal">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM T1 WHERE T1.a IN (SELECT e FROM T2)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(e=[$4])
+  LogicalTableScan(table=[[T2, source: [TestTableSource(a, b, c, d, e)]]])
+})]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+   +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
++- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+   +- LogicalJoin(condition=[=($5, $6)], joinType=[semi]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e, DECIMAL(38, 18) a0)]
+      :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], a0=[CAST($0):DECIMAL(38, 18)]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e, DECIMAL(38, 18) a0)]
+      :  +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+      +- LogicalProject(e=[$4]), rowType=[RecordType(DECIMAL(38, 18) e)]
+         +- LogicalTableScan(table=[[T2, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInToSemiJoinFloatEqualsDouble">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM T1 WHERE T1.c IN (SELECT d FROM T2)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
++- LogicalFilter(condition=[IN($2, {
+LogicalProject(d=[$3])
+  LogicalTableScan(table=[[T2, source: [TestTableSource(a, b, c, d, e)]]])
+})]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+   +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
++- LogicalJoin(condition=[=($2, $6)], joinType=[semi]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+   :- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+   +- LogicalProject(d=[$0], d0=[CAST($0):FLOAT]), rowType=[RecordType(DOUBLE d, FLOAT d0)]
+      +- LogicalProject(d=[$3]), rowType=[RecordType(DOUBLE d)]
+         +- LogicalTableScan(table=[[T2, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInToSemiJoinIntEqualsDouble">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM T1 WHERE T1.a IN (SELECT d FROM T2)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(d=[$3])
+  LogicalTableScan(table=[[T2, source: [TestTableSource(a, b, c, d, e)]]])
+})]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+   +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
++- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+   +- LogicalJoin(condition=[=($5, $6)], joinType=[semi]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e, DOUBLE a0)]
+      :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], a0=[CAST($0):DOUBLE]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e, DOUBLE a0)]
+      :  +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+      +- LogicalProject(d=[$3]), rowType=[RecordType(DOUBLE d)]
+         +- LogicalTableScan(table=[[T2, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInToSemiJoinIntEqualsFloat">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM T1 WHERE T1.a IN (SELECT c FROM T2)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(c=[$2])
+  LogicalTableScan(table=[[T2, source: [TestTableSource(a, b, c, d, e)]]])
+})]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+   +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
++- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+   +- LogicalJoin(condition=[=($5, $6)], joinType=[semi]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e, FLOAT a0)]
+      :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], a0=[CAST($0):FLOAT]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e, FLOAT a0)]
+      :  +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+      +- LogicalProject(c=[$2]), rowType=[RecordType(FLOAT c)]
+         +- LogicalTableScan(table=[[T2, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInToSemiJoinIntEqualsLong">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM T1 WHERE T1.a IN (SELECT b FROM T2)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(b=[$1])
+  LogicalTableScan(table=[[T2, source: [TestTableSource(a, b, c, d, e)]]])
+})]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+   +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
++- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+   +- LogicalJoin(condition=[=($5, $6)], joinType=[semi]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e, BIGINT a0)]
+      :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], a0=[CAST($0):BIGINT]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e, BIGINT a0)]
+      :  +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+      +- LogicalProject(b=[$1]), rowType=[RecordType(BIGINT b)]
+         +- LogicalTableScan(table=[[T2, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinConditionEqualsTypesNotEquals01">
+    <Resource name="sql">
+      <![CDATA[SELECT a FROM T1 LEFT JOIN (SELECT COUNT(*) AS cnt FROM T2) AS x ON a = x.cnt]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0]), rowType=[RecordType(INTEGER a)]
++- LogicalJoin(condition=[=($5, $6)], joinType=[left]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e, BIGINT a0, BIGINT cnt)]
+   :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], a0=[CAST($0):BIGINT]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e, BIGINT a0)]
+   :  +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+   +- LogicalAggregate(group=[{}], cnt=[COUNT()]), rowType=[RecordType(BIGINT cnt)]
+      +- LogicalProject($f0=[0]), rowType=[RecordType(INTEGER $f0)]
+         +- LogicalTableScan(table=[[T2, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0]), rowType=[RecordType(INTEGER a)]
++- LogicalJoin(condition=[=($5, $6)], joinType=[left]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e, BIGINT a0, BIGINT cnt)]
+   :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], a0=[CAST($0):BIGINT]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e, BIGINT a0)]
+   :  +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+   +- LogicalAggregate(group=[{}], cnt=[COUNT()]), rowType=[RecordType(BIGINT cnt)]
+      +- LogicalProject($f0=[0]), rowType=[RecordType(INTEGER $f0)]
+         +- LogicalTableScan(table=[[T2, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinConditionEqualsTypesNotEquals02">
+    <Resource name="sql">
+      <![CDATA[
+-- TC 01.04
+SELECT t3a,
+       t3b
+FROM   T3
+WHERE  t3c IN (SELECT t4b
+               FROM   T4
+               WHERE  t3a = t4a
+                       OR t3b > t4b)
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(t3a=[$0], t3b=[$1]), rowType=[RecordType(VARCHAR(65536) t3a, SMALLINT t3b)]
++- LogicalFilter(condition=[IN($2, {
+LogicalProject(t4b=[$1])
+  LogicalFilter(condition=[OR(=($cor0.t3a, $0), >($cor0.t3b, $1))])
+    LogicalTableScan(table=[[T4, source: [TestTableSource(t4a, t4b, t4c)]]])
+})], variablesSet=[[$cor0]]), rowType=[RecordType(VARCHAR(65536) t3a, SMALLINT t3b, INTEGER t3c)]
+   +- LogicalTableScan(table=[[T3, source: [TestTableSource(t3a, t3b, t3c)]]]), rowType=[RecordType(VARCHAR(65536) t3a, SMALLINT t3b, INTEGER t3c)]
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(t3a=[$0], t3b=[$1]), rowType=[RecordType(VARCHAR(65536) t3a, SMALLINT t3b)]
++- LogicalJoin(condition=[AND(=($0, $4), =($1, $5), =($2, CAST($3):INTEGER))], joinType=[inner]), rowType=[RecordType(VARCHAR(65536) t3a, SMALLINT t3b, INTEGER t3c, SMALLINT t4b, VARCHAR(65536) t3a0, SMALLINT t3b0)]
+   :- LogicalTableScan(table=[[T3, source: [TestTableSource(t3a, t3b, t3c)]]]), rowType=[RecordType(VARCHAR(65536) t3a, SMALLINT t3b, INTEGER t3c)]
+   +- LogicalAggregate(group=[{0, 1, 2}]), rowType=[RecordType(SMALLINT t4b, VARCHAR(65536) t3a, SMALLINT t3b)]
+      +- LogicalProject(t4b=[$1], t3a=[$3], t3b=[$4]), rowType=[RecordType(SMALLINT t4b, VARCHAR(65536) t3a, SMALLINT t3b)]
+         +- LogicalJoin(condition=[OR(=($3, $0), >($4, $1))], joinType=[inner]), rowType=[RecordType(VARCHAR(65536) t4a, SMALLINT t4b, INTEGER t4c, VARCHAR(65536) t3a, SMALLINT t3b)]
+            :- LogicalTableScan(table=[[T4, source: [TestTableSource(t4a, t4b, t4c)]]]), rowType=[RecordType(VARCHAR(65536) t4a, SMALLINT t4b, INTEGER t4c)]
+            +- LogicalAggregate(group=[{0, 1}]), rowType=[RecordType(VARCHAR(65536) t3a, SMALLINT t3b)]
+               +- LogicalProject(t3a=[$0], t3b=[$1]), rowType=[RecordType(VARCHAR(65536) t3a, SMALLINT t3b)]
+                  +- LogicalTableScan(table=[[T3, source: [TestTableSource(t3a, t3b, t3c)]]]), rowType=[RecordType(VARCHAR(65536) t3a, SMALLINT t3b, INTEGER t3c)]
+]]>
+    </Resource>
+  </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/JoinDependentConditionDerivationRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/JoinDependentConditionDerivationRuleTest.xml
new file mode 100644
index 0000000..4ff1748
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/JoinDependentConditionDerivationRuleTest.xml
@@ -0,0 +1,208 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testThreeOr">
+    <Resource name="sql">
+      <![CDATA[SELECT a, d FROM MyTable1, MyTable2 WHERE (b = e AND a = 0) OR ((a = 1 AND d = 2) OR (a = 2 AND d = 1))]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], d=[$3])
++- LogicalFilter(condition=[OR(AND(=($1, $4), =($0, 0)), AND(=($0, 1), =($3, 2)), AND(=($0, 2), =($3, 1)))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+      +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], d=[$3])
++- LogicalJoin(condition=[AND(OR(AND(=($1, $4), =($0, 0)), AND(=($0, 1), =($3, 2)), AND(=($0, 2), =($3, 1))), OR(=($0, 0), =($0, 1), =($0, 2)))], joinType=[inner])
+   :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+   +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testAnd">
+    <Resource name="sql">
+      <![CDATA[SELECT a, d FROM MyTable1, MyTable2 WHERE b = e AND ((a = 1 AND d = 2) OR (a = 2 AND d = 1))]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], d=[$3])
++- LogicalFilter(condition=[AND(=($1, $4), OR(AND(=($0, 1), =($3, 2)), AND(=($0, 2), =($3, 1))))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+      +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], d=[$3])
++- LogicalJoin(condition=[AND(=($1, $4), OR(AND(=($0, 1), =($3, 2)), AND(=($0, 2), =($3, 1))), OR(=($0, 1), =($0, 2)), OR(=($3, 2), =($3, 1)))], joinType=[inner])
+   :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+   +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCanNotMatchThisRule">
+    <Resource name="sql">
+      <![CDATA[SELECT a, d FROM MyTable1, MyTable2 WHERE b = e OR ((a = 1 AND d = 2) OR (a = 2 AND d = 1))]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], d=[$3])
++- LogicalFilter(condition=[OR(=($1, $4), AND(=($0, 1), =($3, 2)), AND(=($0, 2), =($3, 1)))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+      +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], d=[$3])
++- LogicalJoin(condition=[OR(=($1, $4), AND(=($0, 1), =($3, 2)), AND(=($0, 2), =($3, 1)))], joinType=[inner])
+   :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+   +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMultiFields">
+    <Resource name="sql">
+      <![CDATA[SELECT a, d FROM MyTable1, MyTable2 WHERE (a = 1 AND b = 1 AND d = 2 AND e = 2) OR (a = 2 AND b = 2 AND d = 1 AND e = 1)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], d=[$3])
++- LogicalFilter(condition=[OR(AND(=($0, 1), =($1, 1), =($3, 2), =($4, 2)), AND(=($0, 2), =($1, 2), =($3, 1), =($4, 1)))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+      +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], d=[$3])
++- LogicalJoin(condition=[AND(OR(AND(=($0, 1), =($1, 1), =($3, 2), =($4, 2)), AND(=($0, 2), =($1, 2), =($3, 1), =($4, 1))), OR(AND(=($0, 1), =($1, 1)), AND(=($0, 2), =($1, 2))), OR(AND(=($3, 2), =($4, 2)), AND(=($3, 1), =($4, 1))))], joinType=[inner])
+   :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+   +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testAndOr">
+    <Resource name="sql">
+      <![CDATA[SELECT a, d FROM MyTable1, MyTable2 WHERE ((a = 1 AND d = 2) OR (a = 2 AND d = 1)) AND ((a = 3 AND d = 4) OR (a = 4 AND d = 3))]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], d=[$3])
++- LogicalFilter(condition=[AND(OR(AND(=($0, 1), =($3, 2)), AND(=($0, 2), =($3, 1))), OR(AND(=($0, 3), =($3, 4)), AND(=($0, 4), =($3, 3))))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+      +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], d=[$3])
++- LogicalJoin(condition=[AND(OR(AND(=($0, 1), =($3, 2)), AND(=($0, 2), =($3, 1))), OR(AND(=($0, 3), =($3, 4)), AND(=($0, 4), =($3, 3))), OR(=($0, 1), =($0, 2)), OR(=($3, 2), =($3, 1)), OR(=($0, 3), =($0, 4)), OR(=($3, 4), =($3, 3)))], joinType=[inner])
+   :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+   +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMultiJoins">
+    <Resource name="sql">
+      <![CDATA[
+SELECT T1.a, T2.d FROM MyTable1 T1,
+  (SELECT * FROM MyTable1, MyTable2 WHERE a = d) T2 WHERE
+(T1.a = 1 AND T1.b = 1 AND T2.a = 2 AND T2.e = 2)
+OR
+(T1.a = 2 AND T2.b = 2 AND T2.d = 1 AND T2.e = 1)
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], d=[$6])
++- LogicalFilter(condition=[OR(AND(=($0, 1), =($1, 1), =($3, 2), =($7, 2)), AND(=($0, 2), =($4, 2), =($6, 1), =($7, 1)))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+      +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5], g=[$6], h=[$7])
+         +- LogicalFilter(condition=[=($0, $3)])
+            +- LogicalJoin(condition=[true], joinType=[inner])
+               :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+               +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], d=[$6])
++- LogicalJoin(condition=[AND(OR(AND(=($0, 1), =($1, 1), =($3, 2), =($7, 2)), AND(=($0, 2), =($4, 2), =($6, 1), =($7, 1))), OR(AND(=($0, 1), =($1, 1)), =($0, 2)), OR(AND(=($3, 2), =($7, 2)), AND(=($4, 2), =($6, 1), =($7, 1))))], joinType=[inner])
+   :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5], g=[$6], h=[$7])
+      +- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+         :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+         +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMultiSingleSideFields">
+    <Resource name="sql">
+      <![CDATA[SELECT a, d FROM MyTable1, MyTable2 WHERE (a = 1 AND b = 1 AND d = 2 AND e = 2) OR (d = 1 AND e = 1)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], d=[$3])
++- LogicalFilter(condition=[OR(AND(=($0, 1), =($1, 1), =($3, 2), =($4, 2)), AND(=($3, 1), =($4, 1)))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+      +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], d=[$3])
++- LogicalJoin(condition=[AND(OR(AND(=($0, 1), =($1, 1), =($3, 2), =($4, 2)), AND(=($3, 1), =($4, 1))), OR(AND(=($3, 2), =($4, 2)), AND(=($3, 1), =($4, 1))))], joinType=[inner])
+   :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+   +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSimple">
+    <Resource name="sql">
+      <![CDATA[SELECT a, d FROM MyTable1, MyTable2 WHERE (a = 1 AND d = 2) OR (a = 2 AND d = 1)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], d=[$3])
++- LogicalFilter(condition=[OR(AND(=($0, 1), =($3, 2)), AND(=($0, 2), =($3, 1)))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+      +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], d=[$3])
++- LogicalJoin(condition=[AND(OR(AND(=($0, 1), =($3, 2)), AND(=($0, 2), =($3, 1))), OR(=($0, 1), =($0, 2)), OR(=($3, 2), =($3, 1)))], joinType=[inner])
+   :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+   +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+    </Resource>
+  </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/JoinDeriveNullFilterRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/JoinDeriveNullFilterRuleTest.xml
new file mode 100644
index 0000000..58233bf
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/JoinDeriveNullFilterRuleTest.xml
@@ -0,0 +1,212 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testFullJoin">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable1 FULL JOIN MyTable2 ON c1 = c2]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], d1=[$3], e1=[$4], a2=[$5], b2=[$6], c2=[$7], d2=[$8], e2=[$9])
++- LogicalJoin(condition=[=($2, $7)], joinType=[full])
+   :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a1, b1, c1, d1, e1)]]])
+   +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(a2, b2, c2, d2, e2)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], d1=[$3], e1=[$4], a2=[$5], b2=[$6], c2=[$7], d2=[$8], e2=[$9])
++- LogicalJoin(condition=[=($2, $7)], joinType=[full])
+   :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a1, b1, c1, d1, e1)]]])
+   +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(a2, b2, c2, d2, e2)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInnerJoin_NoneEquiJoinKeys">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable1 JOIN MyTable2 ON a1 > a2]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], d1=[$3], e1=[$4], a2=[$5], b2=[$6], c2=[$7], d2=[$8], e2=[$9])
++- LogicalJoin(condition=[>($0, $5)], joinType=[inner])
+   :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a1, b1, c1, d1, e1)]]])
+   +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(a2, b2, c2, d2, e2)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], d1=[$3], e1=[$4], a2=[$5], b2=[$6], c2=[$7], d2=[$8], e2=[$9])
++- LogicalJoin(condition=[>($0, $5)], joinType=[inner])
+   :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a1, b1, c1, d1, e1)]]])
+   +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(a2, b2, c2, d2, e2)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInnerJoin_NoNullCount">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable1 JOIN MyTable2 ON d1 = d2]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], d1=[$3], e1=[$4], a2=[$5], b2=[$6], c2=[$7], d2=[$8], e2=[$9])
++- LogicalJoin(condition=[=($3, $8)], joinType=[inner])
+   :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a1, b1, c1, d1, e1)]]])
+   +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(a2, b2, c2, d2, e2)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], d1=[$3], e1=[$4], a2=[$5], b2=[$6], c2=[$7], d2=[$8], e2=[$9])
++- LogicalJoin(condition=[=($3, $8)], joinType=[inner])
+   :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a1, b1, c1, d1, e1)]]])
+   +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(a2, b2, c2, d2, e2)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInnerJoin_NullCountLessThanThreshold">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable1 JOIN MyTable2 ON e1 = e2]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], d1=[$3], e1=[$4], a2=[$5], b2=[$6], c2=[$7], d2=[$8], e2=[$9])
++- LogicalJoin(condition=[=($4, $9)], joinType=[inner])
+   :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a1, b1, c1, d1, e1)]]])
+   +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(a2, b2, c2, d2, e2)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], d1=[$3], e1=[$4], a2=[$5], b2=[$6], c2=[$7], d2=[$8], e2=[$9])
++- LogicalJoin(condition=[=($4, $9)], joinType=[inner])
+   :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a1, b1, c1, d1, e1)]]])
+   +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(a2, b2, c2, d2, e2)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInnerJoin_NullCountOnLeftJoinKeys">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable1 JOIN MyTable2 ON a1 = a2]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], d1=[$3], e1=[$4], a2=[$5], b2=[$6], c2=[$7], d2=[$8], e2=[$9])
++- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+   :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a1, b1, c1, d1, e1)]]])
+   +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(a2, b2, c2, d2, e2)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], d1=[$3], e1=[$4], a2=[$5], b2=[$6], c2=[$7], d2=[$8], e2=[$9])
++- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+   :- LogicalFilter(condition=[IS NOT NULL($0)])
+   :  +- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a1, b1, c1, d1, e1)]]])
+   +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(a2, b2, c2, d2, e2)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInnerJoin_NullCountOnLeftRightJoinKeys">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable1 JOIN MyTable2 ON c1 = c2]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], d1=[$3], e1=[$4], a2=[$5], b2=[$6], c2=[$7], d2=[$8], e2=[$9])
++- LogicalJoin(condition=[=($2, $7)], joinType=[inner])
+   :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a1, b1, c1, d1, e1)]]])
+   +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(a2, b2, c2, d2, e2)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], d1=[$3], e1=[$4], a2=[$5], b2=[$6], c2=[$7], d2=[$8], e2=[$9])
++- LogicalJoin(condition=[=($2, $7)], joinType=[inner])
+   :- LogicalFilter(condition=[IS NOT NULL($2)])
+   :  +- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a1, b1, c1, d1, e1)]]])
+   +- LogicalFilter(condition=[IS NOT NULL($2)])
+      +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(a2, b2, c2, d2, e2)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLeftJoin">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON c1 = c2]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], d1=[$3], e1=[$4], a2=[$5], b2=[$6], c2=[$7], d2=[$8], e2=[$9])
++- LogicalJoin(condition=[=($2, $7)], joinType=[left])
+   :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a1, b1, c1, d1, e1)]]])
+   +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(a2, b2, c2, d2, e2)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], d1=[$3], e1=[$4], a2=[$5], b2=[$6], c2=[$7], d2=[$8], e2=[$9])
++- LogicalJoin(condition=[=($2, $7)], joinType=[left])
+   :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a1, b1, c1, d1, e1)]]])
+   +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(a2, b2, c2, d2, e2)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInnerJoin_NullCountOnRightJoinKeys">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable1 JOIN MyTable2 ON b1 = b2]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], d1=[$3], e1=[$4], a2=[$5], b2=[$6], c2=[$7], d2=[$8], e2=[$9])
++- LogicalJoin(condition=[=($1, $6)], joinType=[inner])
+   :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a1, b1, c1, d1, e1)]]])
+   +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(a2, b2, c2, d2, e2)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], d1=[$3], e1=[$4], a2=[$5], b2=[$6], c2=[$7], d2=[$8], e2=[$9])
++- LogicalJoin(condition=[=($1, $6)], joinType=[inner])
+   :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a1, b1, c1, d1, e1)]]])
+   +- LogicalFilter(condition=[IS NOT NULL($1)])
+      +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(a2, b2, c2, d2, e2)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testRightJoin">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable1 RIGHT JOIN MyTable2 ON c1 = c2]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], d1=[$3], e1=[$4], a2=[$5], b2=[$6], c2=[$7], d2=[$8], e2=[$9])
++- LogicalJoin(condition=[=($2, $7)], joinType=[right])
+   :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a1, b1, c1, d1, e1)]]])
+   +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(a2, b2, c2, d2, e2)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], d1=[$3], e1=[$4], a2=[$5], b2=[$6], c2=[$7], d2=[$8], e2=[$9])
++- LogicalJoin(condition=[=($2, $7)], joinType=[right])
+   :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a1, b1, c1, d1, e1)]]])
+   +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(a2, b2, c2, d2, e2)]]])
+]]>
+    </Resource>
+  </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/SimplifyJoinConditionRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/SimplifyJoinConditionRuleTest.xml
new file mode 100644
index 0000000..35d22ba
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/SimplifyJoinConditionRuleTest.xml
@@ -0,0 +1,75 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testSimplifyJoinConditionFromSubQuery">
+    <Resource name="sql">
+      <![CDATA[
+SELECT a FROM MyTable1 WHERE b = (
+    SELECT COUNT(*) FROM MyTable2 WHERE (d = a AND d < 2) OR (d = a AND b = 5))
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0])
++- LogicalFilter(condition=[=($1, $SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+  LogicalProject($f0=[0])
+    LogicalFilter(condition=[OR(AND(=($0, $cor0.a), <($0, 2)), AND(=($0, $cor0.a), =($cor0.b, 5)))])
+      LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+}))], variablesSet=[[$cor0]])
+   +- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0])
++- LogicalFilter(condition=[=($1, CASE(IS NULL($5), 0:BIGINT, $5))])
+   +- LogicalJoin(condition=[AND(=($0, $3), =($1, $4))], joinType=[left])
+      :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+      +- LogicalAggregate(group=[{0, 1}], EXPR$0=[COUNT()])
+         +- LogicalProject(a=[$3], b=[$4], $f0=[0])
+            +- LogicalJoin(condition=[AND(=($0, $3), OR(<($0, 2), =($4, 5)))], joinType=[inner])
+               :- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+               +- LogicalAggregate(group=[{0, 1}])
+                  +- LogicalProject(a=[$0], b=[$1])
+                     +- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSimplifyJoinCondition">
+    <Resource name="sql">
+      <![CDATA[SELECT d FROM MyTable1 JOIN MyTable2 ON (d = a AND a > 2) OR (d = a AND b = 1)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(d=[$3])
++- LogicalJoin(condition=[OR(AND(=($3, $0), >($0, 2)), AND(=($3, $0), =($1, 1)))], joinType=[inner])
+   :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+   +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(d=[$3])
++- LogicalJoin(condition=[AND(=($3, $0), OR(>($0, 2), =($1, 1)))], joinType=[inner])
+   :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+   +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/subquery/SubQuerySemiJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/subquery/SubQuerySemiJoinTest.xml
index ac4f50d..bffdd28 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/subquery/SubQuerySemiJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/subquery/SubQuerySemiJoinTest.xml
@@ -1794,13 +1794,15 @@ LogicalAggregate(group=[{}], EXPR$0=[AVG($0)])
     <Resource name="planAfter">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2])
-+- LogicalJoin(condition=[AND(=($1, $3), =($2, $4))], joinType=[semi])
-   :- LogicalTableScan(table=[[l, source: [TestTableSource(a, b, c)]]])
-   +- LogicalProject(EXPR$0=[$1], f=[$0])
-      +- LogicalAggregate(group=[{0}], EXPR$0=[AVG($1)])
-         +- LogicalProject(f=[$2], e=[$1])
-            +- LogicalFilter(condition=[true])
-               +- LogicalTableScan(table=[[r, source: [TestTableSource(d, e, f)]]])
++- LogicalProject(a=[$0], b=[$1], c=[$2])
+   +- LogicalJoin(condition=[AND(=($3, $4), =($2, $5))], joinType=[semi])
+      :- LogicalProject(a=[$0], b=[$1], c=[$2], b0=[CAST($1):DOUBLE])
+      :  +- LogicalTableScan(table=[[l, source: [TestTableSource(a, b, c)]]])
+      +- LogicalProject(EXPR$0=[$1], f=[$0])
+         +- LogicalAggregate(group=[{0}], EXPR$0=[AVG($1)])
+            +- LogicalProject(f=[$2], e=[$1])
+               +- LogicalFilter(condition=[true])
+                  +- LogicalTableScan(table=[[r, source: [TestTableSource(d, e, f)]]])
 ]]>
     </Resource>
   </TestCase>
@@ -1825,14 +1827,15 @@ LogicalProject(d=[$0])
     <Resource name="planAfter">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2])
-+- LogicalJoin(condition=[AND(=($1, $3), >($2, $4))], joinType=[semi])
++- LogicalJoin(condition=[AND(=($1, $5), >($2, $4))], joinType=[semi])
    :- LogicalTableScan(table=[[l, source: [TestTableSource(a, b, c)]]])
-   +- LogicalProject(d=[$0], f=[$1])
-      +- LogicalFilter(condition=[true])
-         +- LogicalProject(d=[$1], f=[$0])
-            +- LogicalAggregate(group=[{0}], d=[MAX($1)])
-               +- LogicalProject(f=[$2], d=[$0])
-                  +- LogicalTableScan(table=[[r, source: [TestTableSource(d, e, f)]]])
+   +- LogicalProject(d=[$0], f=[$1], d0=[CAST($0):BIGINT])
+      +- LogicalProject(d=[$0], f=[$1])
+         +- LogicalFilter(condition=[true])
+            +- LogicalProject(d=[$1], f=[$0])
+               +- LogicalAggregate(group=[{0}], d=[MAX($1)])
+                  +- LogicalProject(f=[$2], d=[$0])
+                     +- LogicalTableScan(table=[[r, source: [TestTableSource(d, e, f)]]])
 ]]>
     </Resource>
   </TestCase>
@@ -2175,11 +2178,13 @@ LogicalProject(d=[$1])
     <Resource name="planAfter">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1])
-+- LogicalJoin(condition=[AND(=($0, $2), =($2, $1))], joinType=[semi])
-   :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b)]]])
-   +- LogicalProject(d=[$1])
-      +- LogicalFilter(condition=[true])
-         +- LogicalTableScan(table=[[y, source: [TestTableSource(c, d)]]])
++- LogicalProject(a=[$0], b=[$1])
+   +- LogicalJoin(condition=[AND(=($2, $3), =($3, $1))], joinType=[semi])
+      :- LogicalProject(a=[$0], b=[$1], a0=[CAST($0):BIGINT])
+      :  +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b)]]])
+      +- LogicalProject(d=[$1])
+         +- LogicalFilter(condition=[true])
+            +- LogicalTableScan(table=[[y, source: [TestTableSource(c, d)]]])
 ]]>
     </Resource>
   </TestCase>
@@ -2204,14 +2209,15 @@ LogicalProject(d=[$0])
     <Resource name="planAfter">
       <![CDATA[
 LogicalProject(c=[$2])
-+- LogicalJoin(condition=[AND(=($1, $3), =($2, $4))], joinType=[semi])
++- LogicalJoin(condition=[AND(=($1, $5), =($2, $4))], joinType=[semi])
    :- LogicalTableScan(table=[[l, source: [TestTableSource(a, b, c)]]])
-   +- LogicalProject(d=[$0], k=[$4])
-      +- LogicalJoin(condition=[=($1, $3)], joinType=[inner])
-         :- LogicalTableScan(table=[[r, source: [TestTableSource(d, e, f)]]])
-         +- LogicalProject(j=[$1], k=[$2])
-            +- LogicalFilter(condition=[>($0, 10)])
-               +- LogicalTableScan(table=[[t, source: [TestTableSource(i, j, k)]]])
+   +- LogicalProject(d=[$0], k=[$1], d0=[CAST($0):BIGINT])
+      +- LogicalProject(d=[$0], k=[$4])
+         +- LogicalJoin(condition=[=($1, $3)], joinType=[inner])
+            :- LogicalTableScan(table=[[r, source: [TestTableSource(d, e, f)]]])
+            +- LogicalProject(j=[$1], k=[$2])
+               +- LogicalFilter(condition=[>($0, 10)])
+                  +- LogicalTableScan(table=[[t, source: [TestTableSource(i, j, k)]]])
 ]]>
     </Resource>
   </TestCase>
@@ -2498,14 +2504,15 @@ LogicalProject(d=[$0])
     <Resource name="planAfter">
       <![CDATA[
 LogicalProject(c=[$2])
-+- LogicalJoin(condition=[AND(=($1, $3), =($2, $4))], joinType=[semi])
++- LogicalJoin(condition=[AND(=($1, $5), =($2, $4))], joinType=[semi])
    :- LogicalTableScan(table=[[l, source: [TestTableSource(a, b, c)]]])
-   +- LogicalProject(d=[$0], k=[$4])
-      +- LogicalJoin(condition=[=($1, $3)], joinType=[right])
-         :- LogicalTableScan(table=[[r, source: [TestTableSource(d, e, f)]]])
-         +- LogicalProject(j=[$1], k=[$2])
-            +- LogicalFilter(condition=[>($0, 10)])
-               +- LogicalTableScan(table=[[t, source: [TestTableSource(i, j, k)]]])
+   +- LogicalProject(d=[$0], k=[$1], d0=[CAST($0):BIGINT])
+      +- LogicalProject(d=[$0], k=[$4])
+         +- LogicalJoin(condition=[=($1, $3)], joinType=[right])
+            :- LogicalTableScan(table=[[r, source: [TestTableSource(d, e, f)]]])
+            +- LogicalProject(j=[$1], k=[$2])
+               +- LogicalFilter(condition=[>($0, 10)])
+                  +- LogicalTableScan(table=[[t, source: [TestTableSource(i, j, k)]]])
 ]]>
     </Resource>
   </TestCase>
@@ -3503,27 +3510,29 @@ LogicalProject(d=[$0], e=[$1])
       <![CDATA[
 LogicalProject(c=[$2])
 +- LogicalProject(a=[$0], b=[$1], c=[$2])
-   +- LogicalJoin(condition=[AND(=($3, $5), =($4, $6))], joinType=[semi])
-      :- LogicalProject(a=[$0], b=[$1], c=[$2], $f3=[CASE(AND(<>($3, 0), IS NOT NULL($6), IS NOT NULL($0)), 1, 2)], $f4=[CASE(AND(<>($7, 0), IS NOT NULL($10), IS NOT NULL($1)), 3, 4)])
-      :  +- LogicalJoin(condition=[=($1, $9)], joinType=[left])
-      :     :- LogicalJoin(condition=[true], joinType=[inner])
-      :     :  :- LogicalJoin(condition=[=($0, $5)], joinType=[left])
-      :     :  :  :- LogicalJoin(condition=[true], joinType=[inner])
-      :     :  :  :  :- LogicalTableScan(table=[[l, source: [TestTableSource(a, b, c)]]])
-      :     :  :  :  +- LogicalAggregate(group=[{}], c=[COUNT()], ck=[COUNT($0)])
-      :     :  :  :     +- LogicalProject(i=[$0])
-      :     :  :  :        +- LogicalTableScan(table=[[t1, source: [TestTableSource(i)]]])
-      :     :  :  +- LogicalAggregate(group=[{0, 1}])
-      :     :  :     +- LogicalProject(i=[$0], i0=[true])
-      :     :  :        +- LogicalTableScan(table=[[t1, source: [TestTableSource(i)]]])
-      :     :  +- LogicalAggregate(group=[{}], c=[COUNT()], ck=[COUNT($0)])
-      :     :     +- LogicalProject(j=[$0])
-      :     :        +- LogicalTableScan(table=[[t2, source: [TestTableSource(j)]]])
-      :     +- LogicalAggregate(group=[{0, 1}])
-      :        +- LogicalProject(j=[$0], i=[true])
-      :           +- LogicalTableScan(table=[[t2, source: [TestTableSource(j)]]])
-      +- LogicalProject(d=[$0], e=[$1])
-         +- LogicalTableScan(table=[[r, source: [TestTableSource(d, e, f)]]])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], $f3=[$3], $f4=[$4])
+      +- LogicalJoin(condition=[AND(=($3, $6), =($5, $7))], joinType=[semi])
+         :- LogicalProject(a=[$0], b=[$1], c=[$2], $f3=[$3], $f4=[$4], $f40=[CAST($4):BIGINT NOT NULL])
+         :  +- LogicalProject(a=[$0], b=[$1], c=[$2], $f3=[CASE(AND(<>($3, 0), IS NOT NULL($6), IS NOT NULL($0)), 1, 2)], $f4=[CASE(AND(<>($7, 0), IS NOT NULL($10), IS NOT NULL($1)), 3, 4)])
+         :     +- LogicalJoin(condition=[=($1, $9)], joinType=[left])
+         :        :- LogicalJoin(condition=[true], joinType=[inner])
+         :        :  :- LogicalJoin(condition=[=($0, $5)], joinType=[left])
+         :        :  :  :- LogicalJoin(condition=[true], joinType=[inner])
+         :        :  :  :  :- LogicalTableScan(table=[[l, source: [TestTableSource(a, b, c)]]])
+         :        :  :  :  +- LogicalAggregate(group=[{}], c=[COUNT()], ck=[COUNT($0)])
+         :        :  :  :     +- LogicalProject(i=[$0])
+         :        :  :  :        +- LogicalTableScan(table=[[t1, source: [TestTableSource(i)]]])
+         :        :  :  +- LogicalAggregate(group=[{0, 1}])
+         :        :  :     +- LogicalProject(i=[$0], i0=[true])
+         :        :  :        +- LogicalTableScan(table=[[t1, source: [TestTableSource(i)]]])
+         :        :  +- LogicalAggregate(group=[{}], c=[COUNT()], ck=[COUNT($0)])
+         :        :     +- LogicalProject(j=[$0])
+         :        :        +- LogicalTableScan(table=[[t2, source: [TestTableSource(j)]]])
+         :        +- LogicalAggregate(group=[{0, 1}])
+         :           +- LogicalProject(j=[$0], i=[true])
+         :              +- LogicalTableScan(table=[[t2, source: [TestTableSource(j)]]])
+         +- LogicalProject(d=[$0], e=[$1])
+            +- LogicalTableScan(table=[[r, source: [TestTableSource(d, e, f)]]])
 ]]>
     </Resource>
   </TestCase>
@@ -3573,14 +3582,15 @@ LogicalProject(d=[$0])
     <Resource name="planAfter">
       <![CDATA[
 LogicalProject(c=[$2])
-+- LogicalJoin(condition=[=($1, $3)], joinType=[semi])
++- LogicalJoin(condition=[=($1, $4)], joinType=[semi])
    :- LogicalTableScan(table=[[l, source: [TestTableSource(a, b, c)]]])
-   +- LogicalProject(d=[$0])
-      +- LogicalJoin(condition=[=($1, $3)], joinType=[full])
-         :- LogicalTableScan(table=[[r, source: [TestTableSource(d, e, f)]]])
-         +- LogicalProject(j=[$1])
-            +- LogicalFilter(condition=[>($0, 10)])
-               +- LogicalTableScan(table=[[t, source: [TestTableSource(i, j, k)]]])
+   +- LogicalProject(d=[$0], d0=[CAST($0):BIGINT])
+      +- LogicalProject(d=[$0])
+         +- LogicalJoin(condition=[=($1, $3)], joinType=[full])
+            :- LogicalTableScan(table=[[r, source: [TestTableSource(d, e, f)]]])
+            +- LogicalProject(j=[$1])
+               +- LogicalFilter(condition=[>($0, 10)])
+                  +- LogicalTableScan(table=[[t, source: [TestTableSource(i, j, k)]]])
 ]]>
     </Resource>
   </TestCase>
@@ -3665,10 +3675,12 @@ LogicalProject(d=[$1])
     <Resource name="planAfter">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1])
-+- LogicalJoin(condition=[=($0, $2)], joinType=[semi])
-   :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b)]]])
-   +- LogicalProject(d=[$1])
-      +- LogicalTableScan(table=[[y, source: [TestTableSource(c, d)]]])
++- LogicalProject(a=[$0], b=[$1])
+   +- LogicalJoin(condition=[=($2, $3)], joinType=[semi])
+      :- LogicalProject(a=[$0], b=[$1], a0=[CAST($0):BIGINT])
+      :  +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b)]]])
+      +- LogicalProject(d=[$1])
+         +- LogicalTableScan(table=[[y, source: [TestTableSource(c, d)]]])
 ]]>
     </Resource>
   </TestCase>
@@ -4385,14 +4397,15 @@ LogicalProject(e=[$0])
     <Resource name="planAfter">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1])
-+- LogicalJoin(condition=[AND(=($1, $2), =($3, $1))], joinType=[semi])
++- LogicalJoin(condition=[AND(=($1, $4), =($3, $1))], joinType=[semi])
    :- LogicalJoin(condition=[=($0, $2)], joinType=[semi])
    :  :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b)]]])
    :  +- LogicalProject(c=[$0])
    :     +- LogicalTableScan(table=[[y, source: [TestTableSource(c, d)]]])
-   +- LogicalProject(e=[$0], f=[$1])
-      +- LogicalFilter(condition=[true])
-         +- LogicalTableScan(table=[[z, source: [TestTableSource(e, f)]]])
+   +- LogicalProject(e=[$0], f=[$1], e0=[CAST($0):BIGINT])
+      +- LogicalProject(e=[$0], f=[$1])
+         +- LogicalFilter(condition=[true])
+            +- LogicalTableScan(table=[[z, source: [TestTableSource(e, f)]]])
 ]]>
     </Resource>
   </TestCase>
@@ -4480,13 +4493,14 @@ LogicalProject(e=[$0])
     <Resource name="planAfter">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1])
-+- LogicalJoin(condition=[=($1, $2)], joinType=[semi])
++- LogicalJoin(condition=[=($1, $3)], joinType=[semi])
    :- LogicalJoin(condition=[=($0, $2)], joinType=[semi])
    :  :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b)]]])
    :  +- LogicalProject(c=[$0])
    :     +- LogicalTableScan(table=[[y, source: [TestTableSource(c, d)]]])
-   +- LogicalProject(e=[$0])
-      +- LogicalTableScan(table=[[z, source: [TestTableSource(e, f)]]])
+   +- LogicalProject(e=[$0], e0=[CAST($0):BIGINT])
+      +- LogicalProject(e=[$0])
+         +- LogicalTableScan(table=[[z, source: [TestTableSource(e, f)]]])
 ]]>
     </Resource>
   </TestCase>
@@ -4544,7 +4558,7 @@ LogicalProject(e=[$0])
       <![CDATA[
 LogicalProject(a=[$0], b=[$1])
 +- LogicalProject(a=[$0], b=[$1])
-   +- LogicalJoin(condition=[=($3, $4)], joinType=[semi])
+   +- LogicalJoin(condition=[=($3, $5)], joinType=[semi])
       :- LogicalProject(a=[$0], b=[$1], $f2=[$2], $f3=[*($1, 2)])
       :  +- LogicalJoin(condition=[=($2, $3)], joinType=[semi])
       :     :- LogicalProject(a=[$0], b=[$1], $f2=[+($0, 1)])
@@ -4552,9 +4566,10 @@ LogicalProject(a=[$0], b=[$1])
       :     +- LogicalProject(c=[$0])
       :        +- LogicalFilter(condition=[>($1, 10)])
       :           +- LogicalTableScan(table=[[y, source: [TestTableSource(c, d)]]])
-      +- LogicalProject(e=[$0])
-         +- LogicalFilter(condition=[<($1, 10)])
-            +- LogicalTableScan(table=[[z, source: [TestTableSource(e, f)]]])
+      +- LogicalProject(e=[$0], e0=[CAST($0):BIGINT])
+         +- LogicalProject(e=[$0])
+            +- LogicalFilter(condition=[<($1, 10)])
+               +- LogicalTableScan(table=[[z, source: [TestTableSource(e, f)]]])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/SemiAntiJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/SemiAntiJoinTest.xml
index 55657c9..bbc870f 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/SemiAntiJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/SemiAntiJoinTest.xml
@@ -2067,13 +2067,14 @@ LogicalFilter(condition=[<($cor0.a, 10)])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Join(joinType=[LeftAntiJoin], where=[true], select=[a, b, c])
-:- Exchange(distribution=[single])
-:  +- Calc(select=[a, b, c], where=[<(a, 10)])
-:     +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-+- Exchange(distribution=[single])
-   +- Calc(select=[])
-      +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+Calc(select=[a, b, c])
++- Join(joinType=[LeftAntiJoin], where=[$f3], select=[a, b, c, $f3])
+   :- Exchange(distribution=[single])
+   :  +- Calc(select=[a, b, c, <(a, 10) AS $f3])
+   :     +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+   +- Exchange(distribution=[single])
+      +- Calc(select=[])
+         +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
 ]]>
     </Resource>
   </TestCase>
@@ -2380,13 +2381,14 @@ LogicalProject(d=[$0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Join(joinType=[LeftAntiJoin], where=[OR(=(a, d), IS NULL(a), IS NULL(d))], select=[a, b, c])
-:- Exchange(distribution=[single])
-:  +- Calc(select=[a, b, c], where=[>(b, 10)])
-:     +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-+- Exchange(distribution=[single])
-   +- Calc(select=[d])
-      +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+Calc(select=[a, b, c])
++- Join(joinType=[LeftAntiJoin], where=[AND(OR(=(a, d), IS NULL(a), IS NULL(d)), $f3)], select=[a, b, c, $f3])
+   :- Exchange(distribution=[single])
+   :  +- Calc(select=[a, b, c, >(b, 10) AS $f3])
+   :     +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+   +- Exchange(distribution=[single])
+      +- Calc(select=[d])
+         +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/WindowJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/WindowJoinTest.xml
index 4cd79fb..6cc4d22 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/WindowJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/WindowJoinTest.xml
@@ -35,12 +35,12 @@ LogicalProject(a=[$0], b=[$6])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[a, b])
-+- WindowJoin(joinType=[InnerJoin], windowBounds=[isRowTime=false, leftLowerBound=0, leftUpperBound=0, leftTimeIndex=1, rightTimeIndex=2], where=[AND(=(a, a0), =(PROCTIME_MATERIALIZE(proctime), PROCTIME_MATERIALIZE(proctime0)))], select=[a, proctime, a0, b, proctime0])
-   :- Exchange(distribution=[hash[a]])
-   :  +- Calc(select=[a, proctime])
++- Join(joinType=[InnerJoin], where=[AND(=(a, a0), =($f5, $f50))], select=[a, $f5, a0, b, $f50])
+   :- Exchange(distribution=[hash[a, $f5]])
+   :  +- Calc(select=[a, PROCTIME_MATERIALIZE(proctime) AS $f5])
    :     +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
-   +- Exchange(distribution=[hash[a]])
-      +- Calc(select=[a, b, proctime])
+   +- Exchange(distribution=[hash[a, $f5]])
+      +- Calc(select=[a, b, PROCTIME_MATERIALIZE(proctime) AS $f5])
          +- DataStreamScan(table=[[_DataStreamTable_1]], fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
@@ -64,12 +64,12 @@ LogicalProject(a=[$0], b=[$6])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[a, b])
-+- WindowJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=0, leftUpperBound=0, leftTimeIndex=1, rightTimeIndex=2], where=[AND(=(a, a0), =(CAST(rowtime), CAST(rowtime0)))], select=[a, rowtime, a0, b, rowtime0])
-   :- Exchange(distribution=[hash[a]])
-   :  +- Calc(select=[a, rowtime])
++- Join(joinType=[InnerJoin], where=[AND(=(a, a0), =(rowtime0, rowtime00))], select=[a, rowtime0, a0, b, rowtime00])
+   :- Exchange(distribution=[hash[a, rowtime0]])
+   :  +- Calc(select=[a, CAST(rowtime) AS rowtime0])
    :     +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
-   +- Exchange(distribution=[hash[a]])
-      +- Calc(select=[a, b, rowtime])
+   +- Exchange(distribution=[hash[a, rowtime0]])
+      +- Calc(select=[a, b, CAST(rowtime) AS rowtime0])
          +- DataStreamScan(table=[[_DataStreamTable_1]], fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/BroadcastHashJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/BroadcastHashJoinTest.scala
index 0bde6c9..fcfd2cb 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/BroadcastHashJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/BroadcastHashJoinTest.scala
@@ -34,13 +34,6 @@ class BroadcastHashJoinTest extends JoinTestBase {
   }
 
   @Test
-  override def testJoinNonMatchingKeyTypes(): Unit = {
-    thrown.expect(classOf[TableException])
-    thrown.expectMessage("Equality join predicate on incompatible types")
-    super.testJoinNonMatchingKeyTypes()
-  }
-
-  @Test
   override def testInnerJoinWithoutJoinPred(): Unit = {
     thrown.expect(classOf[TableException])
     thrown.expectMessage("Cannot generate a valid execution plan for the given query")
@@ -125,6 +118,13 @@ class BroadcastHashJoinTest extends JoinTestBase {
   }
 
   @Test
+  override def testFullOuterWithUsing(): Unit = {
+    thrown.expect(classOf[TableException])
+    thrown.expectMessage("Cannot generate a valid execution plan for the given query")
+    super.testFullOuterWithUsing()
+  }
+
+  @Test
   override def testFullOuterJoinOnTrue(): Unit = {
     thrown.expect(classOf[TableException])
     thrown.expectMessage("Cannot generate a valid execution plan for the given query")
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/JoinTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/JoinTestBase.scala
index 120501f..f7a96cb 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/JoinTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/JoinTestBase.scala
@@ -18,7 +18,7 @@
 package org.apache.flink.table.plan.batch.sql.join
 
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.{TableConfigOptions, ValidationException}
+import org.apache.flink.table.api.{TableException, ValidationException}
 import org.apache.flink.table.util.{BatchTableTestUtil, TableTestBase}
 
 import org.junit.Test
@@ -34,9 +34,9 @@ abstract class JoinTestBase extends TableTestBase {
     util.verifyPlan("SELECT c, g FROM MyTable1, MyTable2 WHERE foo = e")
   }
 
-  @Test
+  @Test(expected = classOf[TableException])
   def testJoinNonMatchingKeyTypes(): Unit = {
-    // TODO do implicit type coercion
+    // INTEGER and VARCHAR(65536) does not have common type now
     util.verifyPlan("SELECT c, g FROM MyTable1, MyTable2 WHERE a = g")
   }
 
@@ -182,6 +182,16 @@ abstract class JoinTestBase extends TableTestBase {
   }
 
   @Test
+  def testFullOuterWithUsing(): Unit = {
+    util.addTableSource[(Int, Long, String)]("MyTable3", 'a, 'b, 'c)
+    val sqlQuery =
+      """
+        |SELECT * FROM (SELECT * FROM MyTable1) FULL JOIN (SELECT * FROM MyTable3) USING (a)
+      """.stripMargin
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
   def testCrossJoin(): Unit = {
     util.verifyPlan("SELECT * FROM MyTable2 CROSS JOIN MyTable1")
   }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala
index 94f015e..9aa2347 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala
@@ -23,8 +23,9 @@ import org.apache.flink.table.calcite.CalciteConfig
 import org.apache.flink.table.plan.optimize.program.FlinkBatchProgram
 import org.apache.flink.table.plan.stream.sql.join.TestTemporalTable
 import org.apache.flink.table.util.TableTestBase
+
 import org.junit.Assert.{assertTrue, fail}
-import org.junit.{Before, Ignore, Test}
+import org.junit.{Before, Test}
 
 class LookupJoinTest extends TableTestBase {
   private val testUtil = batchTestUtil()
@@ -121,6 +122,9 @@ class LookupJoinTest extends TableTestBase {
       .replaceBatchProgram(programs).build()
     testUtil.tableEnv.getConfig.setCalciteConfig(calciteConfig)
 
+    thrown.expect(classOf[TableException])
+    thrown.expectMessage("VARCHAR(65536) and INTEGER does not have common type now")
+
     testUtil.verifyPlan("SELECT * FROM MyTable AS T JOIN temporalTest "
       + "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.b = D.id")
   }
@@ -248,8 +252,7 @@ class LookupJoinTest extends TableTestBase {
   private def expectExceptionThrown(
     sql: String,
     keywords: String,
-    clazz: Class[_ <: Throwable] = classOf[ValidationException])
-  : Unit = {
+    clazz: Class[_ <: Throwable] = classOf[ValidationException]): Unit = {
     try {
       testUtil.verifyExplain(sql)
       fail(s"Expected a $clazz, but no exception is thrown.")
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/ShuffledHashJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/ShuffledHashJoinTest.scala
index 3199f93..a5b639c 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/ShuffledHashJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/ShuffledHashJoinTest.scala
@@ -32,13 +32,6 @@ class ShuffledHashJoinTest extends JoinTestBase {
   }
 
   @Test
-  override def testJoinNonMatchingKeyTypes(): Unit = {
-    thrown.expect(classOf[TableException])
-    thrown.expectMessage("Equality join predicate on incompatible types")
-    super.testJoinNonMatchingKeyTypes()
-  }
-
-  @Test
   override def testInnerJoinWithoutJoinPred(): Unit = {
     thrown.expect(classOf[TableException])
     thrown.expectMessage("Cannot generate a valid execution plan for the given query")
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkFilterJoinRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkFilterJoinRuleTest.scala
new file mode 100644
index 0000000..3f08f1f
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkFilterJoinRuleTest.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.plan.optimize.program.{FlinkBatchProgram,
+  FlinkHepRuleSetProgramBuilder, HEP_RULES_EXECUTION_TYPE}
+import org.apache.flink.table.util.TableTestBase
+
+import org.apache.calcite.plan.hep.HepMatchOrder
+import org.apache.calcite.rel.rules.FilterProjectTransposeRule
+import org.apache.calcite.tools.RuleSets
+import org.junit.{Before, Test}
+
+/**
+  * Test for [[FlinkFilterJoinRule]].
+  */
+class FlinkFilterJoinRuleTest extends TableTestBase {
+  private val util = batchTestUtil()
+
+  @Before
+  def setup(): Unit = {
+    util.buildBatchProgram(FlinkBatchProgram.DEFAULT_REWRITE)
+    util.tableEnv.getConfig.getCalciteConfig.getBatchProgram.get.addLast(
+      "rules",
+      FlinkHepRuleSetProgramBuilder.newBuilder
+        .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
+        .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+        .add(RuleSets.ofList(
+          FilterProjectTransposeRule.INSTANCE,
+          FlinkFilterJoinRule.FILTER_ON_JOIN,
+          FlinkFilterJoinRule.JOIN))
+        .build()
+    )
+
+    util.addTableSource[(Int, Long)]("leftT", 'a, 'b)
+    util.addTableSource[(Int, Long)]("rightT", 'c, 'd)
+  }
+
+  @Test
+  def testFilterPushDownLeftSemi1(): Unit = {
+    val sqlQuery =
+      "SELECT * FROM (SELECT * FROM leftT WHERE a IN (SELECT c FROM rightT)) T WHERE T.b > 2"
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testFilterPushDownLeftSemi2(): Unit = {
+    val sqlQuery =
+      "SELECT * FROM (SELECT * FROM leftT WHERE EXISTS (SELECT * FROM rightT)) T WHERE T.b > 2"
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testFilterPushDownLeftSemi3(): Unit = {
+    val sqlQuery =
+      """
+        |SELECT * FROM (SELECT * FROM leftT WHERE EXISTS
+        |    (SELECT * FROM rightT WHERE a = c)) T WHERE T.b > 2
+      """.stripMargin
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testJoinConditionPushDownLeftSemi1(): Unit = {
+    util.verifyPlan("SELECT * FROM leftT WHERE a IN (SELECT c FROM rightT WHERE b > 2)")
+  }
+
+  @Test
+  def testJoinConditionPushDownLeftSemi2(): Unit = {
+    util.verifyPlan("SELECT * FROM leftT WHERE EXISTS (SELECT * FROM rightT WHERE b > 2)")
+  }
+
+  @Test
+  def testJoinConditionPushDownLeftSemi3(): Unit = {
+    util.verifyPlan("SELECT * FROM leftT WHERE EXISTS (SELECT * FROM rightT WHERE a = c AND b > 2)")
+  }
+
+  @Test
+  def testFilterPushDownLeftAnti1(): Unit = {
+    val sqlQuery =
+      """
+        |SELECT * FROM (SELECT * FROM leftT WHERE a NOT IN
+        |    (SELECT c FROM rightT WHERE c < 3)) T WHERE T.b > 2
+      """.stripMargin
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testFilterPushDownLeftAnti2(): Unit = {
+    val sqlQuery =
+      """
+        |SELECT * FROM (SELECT * FROM leftT WHERE NOT EXISTS
+        |    (SELECT * FROM rightT where c > 10)) T WHERE T.b > 2
+      """.stripMargin
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testFilterPushDownLeftAnti3(): Unit = {
+    val sqlQuery =
+      """
+        |SELECT * FROM (SELECT * FROM leftT WHERE a NOT IN
+        |(SELECT c FROM rightT WHERE b = d AND c < 3)) T WHERE T.b > 2
+      """.stripMargin
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testFilterPushDownLeftAnti4(): Unit = {
+    val sqlQuery =
+      """
+        |SELECT * FROM (SELECT * FROM leftT WHERE NOT EXISTS
+        |    (SELECT * FROM rightT WHERE a = c)) T WHERE T.b > 2
+      """.stripMargin
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testJoinConditionPushDownLeftAnti1(): Unit = {
+    util.verifyPlan("SELECT * FROM leftT WHERE a NOT IN (SELECT c FROM rightT WHERE b > 2)")
+  }
+
+  @Test
+  def testJoinConditionPushDownLeftAnti2(): Unit = {
+    util.verifyPlan("SELECT * FROM leftT WHERE NOT EXISTS (SELECT * FROM rightT WHERE b > 2)")
+  }
+
+  @Test
+  def testJoinConditionPushDownLeftAnti3(): Unit = {
+    val sqlQuery = "SELECT * FROM leftT WHERE a NOT IN (SELECT c FROM rightT WHERE b = d AND b > 1)"
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testJoinConditionPushDownLeftAnti4(): Unit = {
+    val sqlQuery =
+      "SELECT * FROM leftT WHERE NOT EXISTS (SELECT * FROM rightT WHERE a = c AND b > 2)"
+    util.verifyPlan(sqlQuery)
+  }
+
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/JoinConditionEqualityTransferRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/JoinConditionEqualityTransferRuleTest.scala
new file mode 100644
index 0000000..e1a5b6f
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/JoinConditionEqualityTransferRuleTest.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.plan.optimize.program.{FlinkBatchProgram, FlinkHepRuleSetProgramBuilder, HEP_RULES_EXECUTION_TYPE}
+import org.apache.flink.table.util.TableTestBase
+
+import org.apache.calcite.plan.hep.HepMatchOrder
+import org.apache.calcite.tools.RuleSets
+import org.junit.{Before, Test}
+
+
+/**
+  * Test for [[JoinConditionEqualityTransferRule]].
+  */
+class JoinConditionEqualityTransferRuleTest extends TableTestBase {
+
+  private val util = batchTestUtil()
+
+  @Before
+  def setup(): Unit = {
+    util.buildBatchProgram(FlinkBatchProgram.DEFAULT_REWRITE)
+    util.tableEnv.getConfig.getCalciteConfig.getBatchProgram.get.addLast(
+      "rules",
+      FlinkHepRuleSetProgramBuilder.newBuilder
+        .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
+        .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+        .add(RuleSets.ofList(JoinConditionEqualityTransferRule.INSTANCE))
+        .build()
+    )
+
+    util.addTableSource[(Int, Int, Int)]("MyTable1", 'a, 'b, 'c)
+    util.addTableSource[(Int, Int, Int)]("MyTable2", 'd, 'e, 'f)
+  }
+
+  @Test
+  def testInnerJoin1(): Unit = {
+    util.verifyPlan("SELECT * FROM MyTable1 JOIN MyTable2 ON a = d AND a = e")
+  }
+
+  @Test
+  def testInnerJoin2(): Unit = {
+    util.verifyPlan("SELECT * FROM MyTable1 JOIN MyTable2 ON a = d AND a = e AND b = d")
+  }
+
+  @Test
+  def testInnerJoin3(): Unit = {
+    util.verifyPlan("SELECT * FROM MyTable1 JOIN MyTable2 ON a = d AND a = e AND a = c")
+  }
+
+  @Test
+  def testInnerJoin4(): Unit = {
+    util.verifyPlan("SELECT * FROM MyTable1 JOIN MyTable2 ON a = d AND a = e AND b + 1 = d")
+  }
+
+  @Test
+  def testInnerJoinWithNonEquiCondition1(): Unit = {
+    util.verifyPlan("SELECT * FROM MyTable1 JOIN MyTable2 ON a = d AND a > e")
+  }
+
+  @Test
+  def testInnerJoinWithNonEquiCondition2(): Unit = {
+    util.verifyPlan("SELECT * FROM MyTable1 JOIN MyTable2 ON a = d AND a = e AND b > d")
+  }
+
+  @Test
+  def testSemiJoin_In1(): Unit = {
+    util.verifyPlan("SELECT * FROM MyTable1 WHERE a IN (SELECT d FROM MyTable2 WHERE a = e)")
+  }
+
+  @Test
+  def testSemiJoin_In2(): Unit = {
+    val sqlQuery =
+      "SELECT * FROM MyTable1 WHERE a IN (SELECT d FROM MyTable2 WHERE a = e AND b = d)"
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testSemiJoinWithNonEquiCondition_In1(): Unit = {
+    util.verifyPlan("SELECT * FROM MyTable1 WHERE a IN (SELECT d FROM MyTable2 WHERE a > e)")
+  }
+
+  @Test
+  def testSemiJoinWithNonEquiCondition_In2(): Unit = {
+    val sqlQuery =
+      "SELECT * FROM MyTable1 WHERE a IN (SELECT d FROM MyTable2 WHERE a > e AND b = d)"
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testSemiJoin_Exist1(): Unit = {
+    val sqlQuery =
+      """
+        |SELECT * FROM MyTable1 WHERE EXISTS (SELECT * FROM MyTable2 WHERE a = d AND a = e)
+      """.stripMargin
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testSemiJoin_Exist2(): Unit = {
+    val sqlQuery =
+      """
+        |SELECT * FROM MyTable1 WHERE EXISTS
+        |    (SELECT * FROM MyTable2 WHERE a = d AND a = e AND b = d)
+      """.stripMargin
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testSemiJoinWithNonEquiCondition_Exist1(): Unit = {
+    val sqlQuery =
+      """
+        |SELECT * FROM MyTable1 WHERE EXISTS (SELECT * FROM MyTable2 WHERE a = d AND a > e)
+      """.stripMargin
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testSemiJoinWithNonEquiCondition_Exist2(): Unit = {
+    val sqlQuery =
+      """
+        |SELECT * FROM MyTable1 WHERE EXISTS
+        |    (SELECT * FROM MyTable2 WHERE a = d AND a = e AND b > d)
+      """.stripMargin
+    util.verifyPlan(sqlQuery)
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/JoinConditionTypeCoerceRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/JoinConditionTypeCoerceRuleTest.scala
new file mode 100644
index 0000000..0f1cf9b
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/JoinConditionTypeCoerceRuleTest.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.plan.optimize.program.{FlinkBatchProgram, FlinkHepRuleSetProgramBuilder, HEP_RULES_EXECUTION_TYPE}
+import org.apache.flink.table.util.TableTestBase
+
+import org.apache.calcite.plan.hep.HepMatchOrder
+import org.apache.calcite.tools.RuleSets
+import org.junit.{Before, Test}
+
+/**
+  * Test for [[JoinConditionTypeCoerceRule]].
+  * Now only semi-join rewrite will lost the type consistency, so we only cover this kind of
+  * cases.
+  */
+class JoinConditionTypeCoerceRuleTest extends TableTestBase {
+  private val util = batchTestUtil()
+
+  @Before
+  def setup(): Unit = {
+    util.buildBatchProgram(FlinkBatchProgram.DEFAULT_REWRITE)
+    util.tableEnv.getConfig.getCalciteConfig.getBatchProgram.get.addLast(
+      "rules",
+      FlinkHepRuleSetProgramBuilder.newBuilder
+        .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
+        .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+        .add(RuleSets.ofList(JoinConditionTypeCoerceRule.INSTANCE))
+        .build()
+    )
+
+    util.addTableSource[(Int, Long, Float, Double, java.math.BigDecimal)]("T1", 'a, 'b, 'c, 'd, 'e)
+    util.addTableSource[(Int, Long, Float, Double, java.math.BigDecimal)]("T2", 'a, 'b, 'c, 'd, 'e)
+  }
+
+  @Test
+  def testInToSemiJoinIntEqualsLong(): Unit = {
+    util.verifyPlanWithType("SELECT * FROM T1 WHERE T1.a IN (SELECT b FROM T2)")
+  }
+
+  @Test
+  def testInToSemiJoinIntEqualsFloat(): Unit = {
+    util.verifyPlanWithType("SELECT * FROM T1 WHERE T1.a IN (SELECT c FROM T2)")
+  }
+
+  @Test
+  def testInToSemiJoinIntEqualsDouble(): Unit = {
+    util.verifyPlanWithType("SELECT * FROM T1 WHERE T1.a IN (SELECT d FROM T2)")
+  }
+
+  @Test
+  def testInToSemiJoinIntEqualsDecimal(): Unit = {
+    util.verifyPlanWithType("SELECT * FROM T1 WHERE T1.a IN (SELECT e FROM T2)")
+  }
+
+  @Test
+  def testInToSemiJoinFloatEqualsDouble(): Unit = {
+    util.verifyPlanWithType("SELECT * FROM T1 WHERE T1.c IN (SELECT d FROM T2)")
+  }
+
+  @Test
+  def testInToSemiJoinFloatEqualsDecimal(): Unit = {
+    util.verifyPlanWithType("SELECT * FROM T1 WHERE T1.c IN (SELECT e FROM T2)")
+  }
+
+  @Test
+  def testInToSemiJoinDoubleEqualsDecimal(): Unit = {
+    util.verifyPlanWithType("SELECT * FROM T1 WHERE T1.d IN (SELECT e FROM T2)")
+  }
+
+  // Test nullability mismatch.
+  @Test
+  def testJoinConditionEqualsTypesNotEquals01(): Unit = {
+    val sqlQuery = "SELECT a FROM T1 LEFT JOIN (SELECT COUNT(*) AS cnt FROM T2) AS x ON a = x.cnt"
+    util.verifyPlanWithType(sqlQuery)
+  }
+
+  // Join with or predicate is not rewrite by subquery rule but decorrelate.
+  @Test
+  def testJoinConditionEqualsTypesNotEquals02(): Unit = {
+    util.addTableSource[(String, Short, Int)]("T3", 't3a, 't3b, 't3c)
+    util.addTableSource[(String, Short, Int)]("T4", 't4a, 't4b, 't4c)
+
+    val sqlQuery =
+      """
+        |-- TC 01.04
+        |SELECT t3a,
+        |       t3b
+        |FROM   T3
+        |WHERE  t3c IN (SELECT t4b
+        |               FROM   T4
+        |               WHERE  t3a = t4a
+        |                       OR t3b > t4b)
+      """.stripMargin
+    util.verifyPlanWithType(sqlQuery)
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/JoinDependentConditionDerivationRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/JoinDependentConditionDerivationRuleTest.scala
new file mode 100644
index 0000000..47eaf76
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/JoinDependentConditionDerivationRuleTest.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.plan.optimize.program.{FlinkBatchProgram, FlinkHepRuleSetProgramBuilder, HEP_RULES_EXECUTION_TYPE}
+import org.apache.flink.table.util.TableTestBase
+
+import org.apache.calcite.plan.hep.HepMatchOrder
+import org.apache.calcite.tools.RuleSets
+import org.junit.{Before, Test}
+
+
+/**
+  * Test for [[JoinDependentConditionDerivationRule]].
+  */
+class JoinDependentConditionDerivationRuleTest extends TableTestBase {
+
+  private val util = batchTestUtil()
+
+  @Before
+  def setup(): Unit = {
+    util.buildBatchProgram(FlinkBatchProgram.DEFAULT_REWRITE)
+    util.tableEnv.getConfig.getCalciteConfig.getBatchProgram.get.addLast(
+      "rules",
+      FlinkHepRuleSetProgramBuilder.newBuilder
+        .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
+        .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+        .add(RuleSets.ofList(
+          FlinkFilterJoinRule.FILTER_ON_JOIN,
+          FlinkFilterJoinRule.JOIN,
+          JoinDependentConditionDerivationRule.INSTANCE))
+        .build()
+    )
+
+    util.addTableSource[(Int, Long, String)]("MyTable1", 'a, 'b, 'c)
+    util.addTableSource[(Int, Long, Int, String, Long)]("MyTable2", 'd, 'e, 'f, 'g, 'h)
+  }
+
+  @Test
+  def testSimple(): Unit = {
+    val sqlQuery =
+      "SELECT a, d FROM MyTable1, MyTable2 WHERE (a = 1 AND d = 2) OR (a = 2 AND d = 1)"
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testAnd(): Unit = {
+    val sqlQuery =
+      "SELECT a, d FROM MyTable1, MyTable2 WHERE b = e AND ((a = 1 AND d = 2) OR (a = 2 AND d = 1))"
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testCanNotMatchThisRule(): Unit = {
+    val sqlQuery =
+      "SELECT a, d FROM MyTable1, MyTable2 WHERE b = e OR ((a = 1 AND d = 2) OR (a = 2 AND d = 1))"
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testThreeOr(): Unit = {
+    val sqlQuery = "SELECT a, d FROM MyTable1, MyTable2 WHERE " +
+      "(b = e AND a = 0) OR ((a = 1 AND d = 2) OR (a = 2 AND d = 1))"
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testAndOr(): Unit = {
+    val sqlQuery = "SELECT a, d FROM MyTable1, MyTable2 WHERE " +
+      "((a = 1 AND d = 2) OR (a = 2 AND d = 1)) AND ((a = 3 AND d = 4) OR (a = 4 AND d = 3))"
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testMultiFields(): Unit = {
+    val sqlQuery = "SELECT a, d FROM MyTable1, MyTable2 WHERE " +
+      "(a = 1 AND b = 1 AND d = 2 AND e = 2) OR (a = 2 AND b = 2 AND d = 1 AND e = 1)"
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testMultiSingleSideFields(): Unit = {
+    val sqlQuery = "SELECT a, d FROM MyTable1, MyTable2 WHERE " +
+      "(a = 1 AND b = 1 AND d = 2 AND e = 2) OR (d = 1 AND e = 1)"
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testMultiJoins(): Unit = {
+    val sqlQuery =
+      """
+        |SELECT T1.a, T2.d FROM MyTable1 T1,
+        |  (SELECT * FROM MyTable1, MyTable2 WHERE a = d) T2 WHERE
+        |(T1.a = 1 AND T1.b = 1 AND T2.a = 2 AND T2.e = 2)
+        |OR
+        |(T1.a = 2 AND T2.b = 2 AND T2.d = 1 AND T2.e = 1)
+      """.stripMargin
+    util.verifyPlan(sqlQuery)
+  }
+
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/JoinDeriveNullFilterRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/JoinDeriveNullFilterRuleTest.scala
new file mode 100644
index 0000000..b70cdaa
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/JoinDeriveNullFilterRuleTest.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{PlannerConfigOptions, Types}
+import org.apache.flink.table.plan.optimize.program.{FlinkBatchProgram, FlinkHepRuleSetProgramBuilder, HEP_RULES_EXECUTION_TYPE}
+import org.apache.flink.table.plan.stats.{ColumnStats, TableStats}
+import org.apache.flink.table.util.TableTestBase
+
+import org.apache.calcite.plan.hep.HepMatchOrder
+import org.apache.calcite.tools.RuleSets
+import org.junit.{Before, Test}
+
+import scala.collection.JavaConversions._
+
+
+/**
+  * Test for [[JoinDeriveNullFilterRule]].
+  */
+class JoinDeriveNullFilterRuleTest extends TableTestBase {
+
+  private val util = batchTestUtil()
+
+  @Before
+  def setup(): Unit = {
+    util.buildBatchProgram(FlinkBatchProgram.DEFAULT_REWRITE)
+    util.tableEnv.getConfig.getCalciteConfig.getBatchProgram.get.addLast(
+      "rules",
+      FlinkHepRuleSetProgramBuilder.newBuilder
+        .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
+        .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+        .add(RuleSets.ofList(JoinDeriveNullFilterRule.INSTANCE))
+        .build()
+    )
+
+    util.tableEnv.getConfig.getConf.setLong(
+      PlannerConfigOptions.SQL_OPTIMIZER_JOIN_NULL_FILTER_THRESHOLD, 2000000)
+    util.addTableSource("MyTable1",
+      Array[TypeInformation[_]](Types.INT, Types.LONG, Types.STRING, Types.INT, Types.LONG),
+      Array("a1", "b1", "c1", "d1", "e1"),
+      Some(new TableStats(1000000000, Map(
+        "a1" -> new ColumnStats(null, 10000000L, 4.0, 4, null, null),
+        "c1" -> new ColumnStats(null, 5000000L, 10.2, 16, null, null),
+        "e1" -> new ColumnStats(null, 500000L, 8.0, 8, null, null)
+      ))))
+    util.addTableSource("MyTable2",
+      Array[TypeInformation[_]](Types.INT, Types.LONG, Types.STRING, Types.INT, Types.LONG),
+      Array("a2", "b2", "c2", "d2", "e2"),
+      Some(new TableStats(2000000000, Map(
+        "b2" -> new ColumnStats(null, 10000000L, 8.0, 8, null, null),
+        "c2" -> new ColumnStats(null, 3000000L, 18.6, 32, null, null),
+        "e2" -> new ColumnStats(null, 1500000L, 8.0, 8, null, null)
+      ))))
+  }
+
+  @Test
+  def testInnerJoin_NoneEquiJoinKeys(): Unit = {
+    util.verifyPlan("SELECT * FROM MyTable1 JOIN MyTable2 ON a1 > a2")
+  }
+
+  @Test
+  def testInnerJoin_NullCountOnLeftJoinKeys(): Unit = {
+    util.verifyPlan("SELECT * FROM MyTable1 JOIN MyTable2 ON a1 = a2")
+  }
+
+  @Test
+  def testInnerJoin_NullCountOnRightJoinKeys(): Unit = {
+    util.verifyPlan("SELECT * FROM MyTable1 JOIN MyTable2 ON b1 = b2")
+  }
+
+  @Test
+  def testInnerJoin_NullCountOnLeftRightJoinKeys(): Unit = {
+    util.verifyPlan("SELECT * FROM MyTable1 JOIN MyTable2 ON c1 = c2")
+  }
+
+  @Test
+  def testInnerJoin_NoNullCount(): Unit = {
+    util.verifyPlan("SELECT * FROM MyTable1 JOIN MyTable2 ON d1 = d2")
+  }
+
+  @Test
+  def testInnerJoin_NullCountLessThanThreshold(): Unit = {
+    util.verifyPlan("SELECT * FROM MyTable1 JOIN MyTable2 ON e1 = e2")
+  }
+
+  @Test
+  def testLeftJoin(): Unit = {
+    util.verifyPlan("SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON c1 = c2")
+  }
+
+  @Test
+  def testRightJoin(): Unit = {
+    util.verifyPlan("SELECT * FROM MyTable1 RIGHT JOIN MyTable2 ON c1 = c2")
+  }
+
+  @Test
+  def testFullJoin(): Unit = {
+    util.verifyPlan("SELECT * FROM MyTable1 FULL JOIN MyTable2 ON c1 = c2")
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/SimplifyJoinConditionRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/SimplifyJoinConditionRuleTest.scala
new file mode 100644
index 0000000..b8ad2e7
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/SimplifyJoinConditionRuleTest.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.plan.optimize.program.{FlinkBatchProgram, FlinkHepRuleSetProgramBuilder, HEP_RULES_EXECUTION_TYPE}
+import org.apache.flink.table.util.TableTestBase
+
+import org.apache.calcite.plan.hep.HepMatchOrder
+import org.apache.calcite.tools.RuleSets
+import org.junit.{Before, Test}
+
+/**
+  * Tests for [[SimplifyJoinConditionRule]].
+  */
+class SimplifyJoinConditionRuleTest extends TableTestBase {
+
+  private val util = batchTestUtil()
+
+  @Before
+  def setup(): Unit = {
+    util.buildBatchProgram(FlinkBatchProgram.DEFAULT_REWRITE)
+    util.tableEnv.getConfig.getCalciteConfig.getBatchProgram.get.addLast(
+      "SimplifyJoinConditionRule",
+      FlinkHepRuleSetProgramBuilder.newBuilder
+        .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
+        .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+        .add(RuleSets.ofList(SimplifyJoinConditionRule.INSTANCE))
+        .build()
+    )
+
+    util.addTableSource[(Int, Long, String)]("MyTable1", 'a, 'b, 'c)
+    util.addTableSource[(Int, Long, String)]("MyTable2", 'd, 'e, 'f)
+  }
+
+  @Test
+  def testSimplifyJoinCondition(): Unit = {
+    val sqlQuery = "SELECT d FROM MyTable1 JOIN MyTable2 ON (d = a AND a > 2) OR (d = a AND b = 1)"
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testSimplifyJoinConditionFromSubQuery(): Unit = {
+    val sqlQuery =
+      """
+        |SELECT a FROM MyTable1 WHERE b = (
+        |    SELECT COUNT(*) FROM MyTable2 WHERE (d = a AND d < 2) OR (d = a AND b = 5))
+      """.stripMargin
+    util.verifyPlan(sqlQuery)
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
index 0b210f5..ed5feed 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
@@ -189,6 +189,8 @@ class LookupJoinTest extends TableTestBase with Serializable {
   @Test
   def testJoinOnDifferentKeyTypes(): Unit = {
     // Will do implicit type coercion.
+    thrown.expect(classOf[TableException])
+    thrown.expectMessage("VARCHAR(65536) and INTEGER does not have common type now")
     streamUtil.verifyPlan("SELECT * FROM MyTable AS T JOIN temporalTest "
       + "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.b = D.id")
   }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
index e304407..fedd7e8 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
@@ -26,13 +26,15 @@ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.{TimeCharacteristic, environment}
 import org.apache.flink.table.`type`.{InternalType, TypeConverters}
 import org.apache.flink.table.api._
-import java.{BatchTableEnvironment => JavaBatchTableEnv, StreamTableEnvironment => JavaStreamTableEnv}
+import org.apache.flink.table.api.java.{BatchTableEnvironment => JavaBatchTableEnv, StreamTableEnvironment => JavaStreamTableEnv}
 import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv, _}
 import org.apache.flink.table.calcite.CalciteConfig
 import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction}
 import org.apache.flink.table.plan.nodes.exec.ExecNode
 import org.apache.flink.table.plan.optimize.program.{FlinkBatchProgram, FlinkStreamProgram}
+import org.apache.flink.table.plan.schema.{BatchTableSourceTable, StreamTableSourceTable}
+import org.apache.flink.table.plan.stats.{FlinkStatistic, TableStats}
 import org.apache.flink.table.plan.util.{ExecNodePlanDumper, FlinkRelOptUtil}
 import org.apache.flink.table.runtime.utils.{BatchTableEnvUtil, TestingAppendTableSink, TestingRetractTableSink, TestingUpsertTableSink}
 import org.apache.flink.table.sinks.{AppendStreamTableSink, CollectRowTableSink, RetractStreamTableSink, TableSink, UpsertStreamTableSink}
@@ -47,6 +49,8 @@ import org.junit.Assert.{assertEquals, assertTrue}
 import org.junit.Rule
 import org.junit.rules.{ExpectedException, TestName}
 
+import _root_.java.util.{Set => JSet}
+
 import _root_.scala.collection.JavaConversions._
 
 /**
@@ -134,24 +138,22 @@ abstract class TableTestUtil(test: TableTestBase) {
   }
 
   /**
-    * Create a [[TestTableSource]] with the given schema,
+    * Create a [[TestTableSource]] with the given schema, table stats and unique keys,
     * and registers this TableSource under given name into the TableEnvironment's catalog.
     *
     * @param name table name
     * @param types field types
     * @param names field names
+    * @param tableStats table stats
+    * @param uniqueKeys unique keys
     * @return returns the registered [[Table]].
     */
   def addTableSource(
       name: String,
       types: Array[TypeInformation[_]],
-      names: Array[String]): Table = {
-    val tableEnv = getTableEnv
-    val schema = new TableSchema(names, types)
-    val tableSource = new TestTableSource(schema)
-    tableEnv.registerTableSource(name, tableSource)
-    tableEnv.scan(name)
-  }
+      names: Array[String],
+      tableStats: Option[TableStats] = None,
+      uniqueKeys: Option[JSet[_ <: JSet[String]]] = None): Table
 
   /**
     * Create a [[DataStream]] with the given schema,
@@ -472,6 +474,24 @@ case class StreamTableTestUtil(test: TableTestBase) extends TableTestUtil(test)
     tableEnv.scan(name)
   }
 
+  override def addTableSource(
+      name: String,
+      types: Array[TypeInformation[_]],
+      names: Array[String],
+      tableStats: Option[TableStats] = None,
+      uniqueKeys: Option[JSet[_ <: JSet[String]]] = None): Table = {
+    val tableEnv = getTableEnv
+    val schema = new TableSchema(names, types)
+    val tableSource = new TestTableSource(schema)
+    val statistic = FlinkStatistic.builder()
+      .tableStats(tableStats.orNull)
+      .uniqueKeys(uniqueKeys.orNull)
+      .build()
+    val table = new StreamTableSourceTable[BaseRow](tableSource, statistic)
+    tableEnv.registerTableInternal(name, table)
+    tableEnv.scan(name)
+  }
+
   def verifyPlanWithTrait(): Unit = {
     doVerifyPlan(
       SqlExplainLevel.EXPPLAN_ATTRIBUTES,
@@ -572,6 +592,24 @@ case class BatchTableTestUtil(test: TableTestBase) extends TableTestUtil(test) {
     tableEnv.scan(name)
   }
 
+  override def addTableSource(
+      name: String,
+      types: Array[TypeInformation[_]],
+      names: Array[String],
+      tableStats: Option[TableStats] = None,
+      uniqueKeys: Option[JSet[_ <: JSet[String]]] = None): Table = {
+    val tableEnv = getTableEnv
+    val schema = new TableSchema(names, types)
+    val tableSource = new TestTableSource(schema)
+    val statistic = FlinkStatistic.builder()
+      .tableStats(tableStats.orNull)
+      .uniqueKeys(uniqueKeys.orNull)
+      .build()
+    val table = new BatchTableSourceTable[BaseRow](tableSource, statistic)
+    tableEnv.registerTableInternal(name, table)
+    tableEnv.scan(name)
+  }
+
   def buildBatchProgram(firstProgramNameToRemove: String): Unit = {
     val program = FlinkBatchProgram.buildProgram(tableEnv.getConfig.getConf)
     var startRemove = false