You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/05/22 01:14:57 UTC
[flink] branch master updated: [FLINK-12509] [table-planner-blink]
Introduce planner rules about non semi/anti join, which includes:
This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b36c54d [FLINK-12509] [table-planner-blink] Introduce planner rules about non semi/anti join, which includes:
b36c54d is described below
commit b36c54dd06122b7ddf10048ca72f1b5e448b4cd2
Author: godfrey he <go...@163.com>
AuthorDate: Wed May 22 09:14:46 2019 +0800
[FLINK-12509] [table-planner-blink] Introduce planner rules about non semi/anti join, which includes:
1. `JoinConditionEqualityTransferRule` that converts conditions to the left or right table's own independent filter as much as possible.
2. `JoinConditionTypeCoerceRule` to that coerces the both sides of EQUALS(`=`) operator in Join condition to the same type while sans nullability.
3. `JoinDependentConditionDerivationRule` that extracts some sub-conditions in the Join OR condition which can be pushed into join's inputs further by FlinkFilterJoinRule.
4. `JoinDeriveNullFilterRule` that filters null values before join if the count of null value exceeds some threshold.
This closes #8440
---
.../flink/table/api/PlannerConfigOptions.java | 7 +
.../plan/rules/logical/FlinkFilterJoinRule.java | 7 +-
.../plan/optimize/program/FlinkBatchProgram.scala | 29 +-
.../table/plan/rules/FlinkBatchRuleSets.scala | 33 +-
.../table/plan/rules/FlinkStreamRuleSets.scala | 26 +-
.../JoinConditionEqualityTransferRule.scala | 172 ++++++++
.../logical/JoinConditionTypeCoerceRule.scala | 128 ++++++
.../JoinDependentConditionDerivationRule.scala | 145 +++++++
.../rules/logical/JoinDeriveNullFilterRule.scala | 94 +++++
.../rules/logical/SimplifyJoinConditionRule.scala | 70 ++++
.../sql/join/BroadcastHashSemiAntiJoinTest.xml | 13 +-
.../plan/batch/sql/join/NestedLoopJoinTest.xml | 52 +--
.../batch/sql/join/NestedLoopSemiAntiJoinTest.xml | 41 +-
.../table/plan/batch/sql/join/SemiAntiJoinTest.xml | 43 +-
.../plan/batch/sql/join/ShuffledHashJoinTest.xml | 27 ++
.../sql/join/ShuffledHashSemiAntiJoinTest.xml | 15 +-
.../plan/batch/sql/join/SortMergeJoinTest.xml | 53 +--
.../batch/sql/join/SortMergeSemiAntiJoinTest.xml | 15 +-
.../plan/rules/logical/FlinkFilterJoinRuleTest.xml | 435 +++++++++++++++++++++
.../JoinConditionEqualityTransferRuleTest.xml | 361 +++++++++++++++++
.../logical/JoinConditionTypeCoerceRuleTest.xml | 266 +++++++++++++
.../JoinDependentConditionDerivationRuleTest.xml | 208 ++++++++++
.../rules/logical/JoinDeriveNullFilterRuleTest.xml | 212 ++++++++++
.../logical/SimplifyJoinConditionRuleTest.xml | 75 ++++
.../logical/subquery/SubQuerySemiJoinTest.xml | 167 ++++----
.../plan/stream/sql/join/SemiAntiJoinTest.xml | 30 +-
.../table/plan/stream/sql/join/WindowJoinTest.xml | 20 +-
.../batch/sql/join/BroadcastHashJoinTest.scala | 14 +-
.../table/plan/batch/sql/join/JoinTestBase.scala | 16 +-
.../table/plan/batch/sql/join/LookupJoinTest.scala | 9 +-
.../plan/batch/sql/join/ShuffledHashJoinTest.scala | 7 -
.../rules/logical/FlinkFilterJoinRuleTest.scala | 157 ++++++++
.../JoinConditionEqualityTransferRuleTest.scala | 144 +++++++
.../logical/JoinConditionTypeCoerceRuleTest.scala | 114 ++++++
.../JoinDependentConditionDerivationRuleTest.scala | 118 ++++++
.../logical/JoinDeriveNullFilterRuleTest.scala | 117 ++++++
.../logical/SimplifyJoinConditionRuleTest.scala | 66 ++++
.../plan/stream/sql/join/LookupJoinTest.scala | 2 +
.../apache/flink/table/util/TableTestBase.scala | 56 ++-
39 files changed, 3293 insertions(+), 271 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/PlannerConfigOptions.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/PlannerConfigOptions.java
index 30dc16f..c2bdf85 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/PlannerConfigOptions.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/PlannerConfigOptions.java
@@ -79,6 +79,13 @@ public class PlannerConfigOptions {
.withDescription("When the semi-side of semi/anti join can distinct a lot of data in advance," +
" we will add distinct node before semi/anti join.");
+ public static final ConfigOption<Long> SQL_OPTIMIZER_JOIN_NULL_FILTER_THRESHOLD =
+ key("sql.optimizer.join.null.filter.threshold")
+ .defaultValue(2000000L)
+ .withDescription("If the source of InnerJoin has nullCount more than this value, " +
+ "it will add a null filter (possibly be pushDowned) before the join, filter" +
+ " null values to avoid the impact of null values on the single join node.");
+
public static final ConfigOption<Boolean> SQL_OPTIMIZER_DATA_SKEW_DISTINCT_AGG_ENABLED =
key("sql.optimizer.data-skew.distinct-agg.enabled")
.defaultValue(false)
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/FlinkFilterJoinRule.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/FlinkFilterJoinRule.java
index dd5be0f..3b593de 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/FlinkFilterJoinRule.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/FlinkFilterJoinRule.java
@@ -48,7 +48,7 @@ import static org.apache.calcite.plan.RelOptUtil.conjunctions;
* This rules is copied from Calcite's {@link org.apache.calcite.rel.rules.FilterJoinRule}.
* Modification:
* - Use `FlinkRelOptUtil.classifyFilters` to support SEMI/ANTI join
- * - TODO Handles the ON condition of anti-join can not be pushed down
+ * - Handles the ON condition of anti-join can not be pushed down
*/
/**
@@ -192,10 +192,13 @@ public abstract class FlinkFilterJoinRule extends RelOptRule {
}
}
+ boolean isAntiJoin = joinType == JoinRelType.ANTI;
+
// Try to push down filters in ON clause. A ON clause filter can only be
// pushed down if it does not affect the non-matching set, i.e. it is
// not on the side which is preserved.
- if (FlinkRelOptUtil.classifyFilters(
+ // A ON clause filter of anti-join can not be pushed down.
+ if (!isAntiJoin && FlinkRelOptUtil.classifyFilters(
join,
joinFilters,
joinType,
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala
index 5f03e5d..0191d79 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala
@@ -33,6 +33,7 @@ object FlinkBatchProgram {
val DECORRELATE = "decorrelate"
val DEFAULT_REWRITE = "default_rewrite"
val PREDICATE_PUSHDOWN = "predicate_pushdown"
+ val JOIN_REWRITE = "join_rewrite"
val WINDOW = "window"
val LOGICAL = "logical"
val LOGICAL_REWRITE = "logical_rewrite"
@@ -105,11 +106,20 @@ object FlinkBatchProgram {
PREDICATE_PUSHDOWN,
FlinkGroupProgramBuilder.newBuilder[BatchOptimizeContext]
.addProgram(
- FlinkHepRuleSetProgramBuilder.newBuilder
- .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkBatchRuleSets.FILTER_PREPARE_RULES)
- .build(), "filter rules")
+ FlinkGroupProgramBuilder.newBuilder[BatchOptimizeContext]
+ .addProgram(
+ FlinkHepRuleSetProgramBuilder.newBuilder
+ .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
+ .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+ .add(FlinkBatchRuleSets.JOIN_PREDICATE_REWRITE_RULES)
+ .build(), "join predicate rewrite")
+ .addProgram(
+ FlinkHepRuleSetProgramBuilder.newBuilder
+ .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
+ .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+ .add(FlinkBatchRuleSets.FILTER_PREPARE_RULES)
+ .build(), "other predicate rewrite")
+ .setIterations(5).build())
.addProgram(
FlinkHepRuleSetProgramBuilder.newBuilder
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
@@ -118,6 +128,15 @@ object FlinkBatchProgram {
.build(), "prune empty after predicate push down")
.build())
+ // join rewrite
+ chainedProgram.addLast(
+ JOIN_REWRITE,
+ FlinkHepRuleSetProgramBuilder.newBuilder
+ .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
+ .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+ .add(FlinkBatchRuleSets.JOIN_COND_EQUAL_TRANSFER_RULES)
+ .build())
+
// window rewrite
chainedProgram.addLast(
WINDOW,
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
index fa48ecf..bf6f247 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
@@ -35,6 +35,7 @@ object FlinkBatchRuleSets {
val SEMI_JOIN_RULES: RuleSet = RuleSets.ofList(
SimplifyFilterConditionRule.EXTENDED,
FlinkSubQueryRemoveRule.FILTER,
+ JoinConditionTypeCoerceRule.INSTANCE,
FlinkJoinPushExpressionsRule.INSTANCE
)
@@ -88,10 +89,21 @@ object FlinkBatchRuleSets {
)
/**
+ * RuleSet to simplify predicate expressions in filters and joins
+ */
+ private val PREDICATE_SIMPLIFY_EXPRESSION_RULES: RuleSet = RuleSets.ofList(
+ SimplifyFilterConditionRule.INSTANCE,
+ SimplifyJoinConditionRule.INSTANCE,
+ JoinConditionTypeCoerceRule.INSTANCE,
+ JoinPushExpressionsRule.INSTANCE
+ )
+
+ /**
* RuleSet to normalize plans for batch
*/
val DEFAULT_REWRITE_RULES: RuleSet = RuleSets.ofList((
- REWRITE_COALESCE_RULES.asScala ++
+ PREDICATE_SIMPLIFY_EXPRESSION_RULES.asScala ++
+ REWRITE_COALESCE_RULES.asScala ++
REDUCE_EXPRESSION_RULES.asScala ++
List(
// Transform window to LogicalWindowAggregate
@@ -123,12 +135,9 @@ object FlinkBatchRuleSets {
FilterMergeRule.INSTANCE
)
- /**
- * Ruleset to simplify expressions
- */
- private val PREDICATE_SIMPLIFY_EXPRESSION_RULES: RuleSet = RuleSets.ofList(
- // TODO: add filter simply and join condition simplify rules
- JoinPushExpressionsRule.INSTANCE
+ val JOIN_PREDICATE_REWRITE_RULES: RuleSet = RuleSets.ofList(
+ JoinDependentConditionDerivationRule.INSTANCE,
+ JoinDeriveNullFilterRule.INSTANCE
)
/**
@@ -136,7 +145,7 @@ object FlinkBatchRuleSets {
*/
val FILTER_PREPARE_RULES: RuleSet = RuleSets.ofList((
FILTER_RULES.asScala
- // simplify expressions
+ // simplify predicate expressions in filters and joins
++ PREDICATE_SIMPLIFY_EXPRESSION_RULES.asScala
// reduce expressions in filters and joins
++ REDUCE_EXPRESSION_RULES.asScala
@@ -162,7 +171,7 @@ object FlinkBatchRuleSets {
val PROJECT_RULES: RuleSet = RuleSets.ofList(
// push a projection past a filter
ProjectFilterTransposeRule.INSTANCE,
- // push a projection to the children of a join
+ // push a projection to the children of a non semi/anti join
// push all expressions to handle the time indicator correctly
new FlinkProjectJoinTransposeRule(
PushProjector.ExprCondition.FALSE, RelFactories.LOGICAL_BUILDER),
@@ -189,6 +198,12 @@ object FlinkBatchRuleSets {
WindowPropertiesRules.WINDOW_PROPERTIES_HAVING_RULE
)
+ val JOIN_COND_EQUAL_TRANSFER_RULES: RuleSet = RuleSets.ofList((
+ RuleSets.ofList(JoinConditionEqualityTransferRule.INSTANCE).asScala ++
+ PREDICATE_SIMPLIFY_EXPRESSION_RULES.asScala ++
+ FILTER_RULES.asScala
+ ).asJava)
+
/**
* RuleSet to do logical optimize.
* This RuleSet is a sub-set of [[LOGICAL_OPT_RULES]].
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
index 1aacac8..cbab9e3 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
@@ -35,6 +35,7 @@ object FlinkStreamRuleSets {
val SEMI_JOIN_RULES: RuleSet = RuleSets.ofList(
SimplifyFilterConditionRule.EXTENDED,
FlinkSubQueryRemoveRule.FILTER,
+ JoinConditionTypeCoerceRule.INSTANCE,
FlinkJoinPushExpressionsRule.INSTANCE
)
@@ -88,10 +89,21 @@ object FlinkStreamRuleSets {
)
/**
+ * RuleSet to simplify predicate expressions in filters and joins
+ */
+ private val PREDICATE_SIMPLIFY_EXPRESSION_RULES: RuleSet = RuleSets.ofList(
+ SimplifyFilterConditionRule.INSTANCE,
+ SimplifyJoinConditionRule.INSTANCE,
+ JoinConditionTypeCoerceRule.INSTANCE,
+ JoinPushExpressionsRule.INSTANCE
+ )
+
+ /**
* RuleSet to normalize plans for stream
*/
val DEFAULT_REWRITE_RULES: RuleSet = RuleSets.ofList((
- REWRITE_COALESCE_RULES.asScala ++
+ PREDICATE_SIMPLIFY_EXPRESSION_RULES.asScala ++
+ REWRITE_COALESCE_RULES.asScala ++
REDUCE_EXPRESSION_RULES.asScala ++
List(
StreamLogicalWindowAggregateRule.INSTANCE,
@@ -128,19 +140,11 @@ object FlinkStreamRuleSets {
)
/**
- * Ruleset to simplify expressions
- */
- private val PREDICATE_SIMPLIFY_EXPRESSION_RULES: RuleSet = RuleSets.ofList(
- // TODO: add filter simply and join condition simplify rules
- FlinkJoinPushExpressionsRule.INSTANCE
- )
-
- /**
* RuleSet to do predicate pushdown
*/
val FILTER_PREPARE_RULES: RuleSet = RuleSets.ofList((
FILTER_RULES.asScala
- // simplify expressions
+ // simplify predicate expressions in filters and joins
++ PREDICATE_SIMPLIFY_EXPRESSION_RULES.asScala
// reduce expressions in filters and joins
++ REDUCE_EXPRESSION_RULES.asScala
@@ -165,7 +169,7 @@ object FlinkStreamRuleSets {
val PROJECT_RULES: RuleSet = RuleSets.ofList(
// push a projection past a filter
ProjectFilterTransposeRule.INSTANCE,
- // push a projection to the children of a join
+ // push a projection to the children of a non semi/anti join
// push all expressions to handle the time indicator correctly
new FlinkProjectJoinTransposeRule(
PushProjector.ExprCondition.FALSE, RelFactories.LOGICAL_BUILDER),
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/JoinConditionEqualityTransferRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/JoinConditionEqualityTransferRule.scala
new file mode 100644
index 0000000..910f127
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/JoinConditionEqualityTransferRule.scala
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.table.plan.util.FlinkRexUtil
+
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptUtil}
+import org.apache.calcite.rel.core.{Join, JoinRelType}
+import org.apache.calcite.rex.{RexBuilder, RexCall, RexInputRef, RexNode}
+import org.apache.calcite.sql.SqlKind
+import org.apache.calcite.sql.fun.SqlStdOperatorTable.EQUALS
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+
+/**
+ * Planner rule that converts Join's conditions to the left or right table's own
+ * independent filter as much as possible, so that the rules of filter-push-down can push down
+ * the filter to below.
+ *
+ * <p>e.g. join condition: l_a = r_b and l_a = r_c.
+ * The l_a is a field from left input, both r_b and r_c are fields from the right input.
+ * After rewrite, condition will be: l_a = r_b and r_b = r_c.
+ * r_b = r_c can be pushed down to the right input.
+ */
+class JoinConditionEqualityTransferRule extends RelOptRule(
+ operand(classOf[Join], any),
+ "JoinConditionEqualityTransferRule") {
+
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val join: Join = call.rel(0)
+ val joinType = join.getJoinType
+ if (joinType != JoinRelType.INNER && joinType != JoinRelType.SEMI) {
+ return false
+ }
+
+ val (optimizableFilters, _) = partitionJoinFilters(join)
+ val groups = getEquiFilterRelationshipGroup(optimizableFilters)
+ groups.exists(_.size > 2)
+ }
+
+ override def onMatch(call: RelOptRuleCall): Unit = {
+ val join: Join = call.rel(0)
+ val (optimizableFilters, remainFilters) = partitionJoinFilters(join)
+ val (equiFiltersToOpt, equiFiltersNotOpt) =
+ getEquiFilterRelationshipGroup(optimizableFilters).partition(_.size > 2)
+
+ val builder = call.builder()
+ val rexBuilder = builder.getRexBuilder
+ val newEquiJoinFilters = mutable.ListBuffer[RexNode]()
+
+ // add equiFiltersNotOpt.
+ equiFiltersNotOpt.foreach { refs =>
+ require(refs.size == 2)
+ newEquiJoinFilters += rexBuilder.makeCall(EQUALS, refs.head, refs.last)
+ }
+
+ // new opt filters.
+ equiFiltersToOpt.foreach { refs =>
+ // partition to InputRef to left and right.
+ val (leftRefs, rightRefs) = refs.partition(fromJoinLeft(join, _))
+ val rexCalls = new mutable.ArrayBuffer[RexNode]()
+
+ // equals for each other.
+ rexCalls ++= makeCalls(rexBuilder, leftRefs)
+ rexCalls ++= makeCalls(rexBuilder, rightRefs)
+
+ // equals for left and right.
+ if (leftRefs.nonEmpty && rightRefs.nonEmpty) {
+ rexCalls += rexBuilder.makeCall(EQUALS, leftRefs.head, rightRefs.head)
+ }
+
+ // add to newEquiJoinFilters with deduplication.
+ rexCalls.foreach(call => newEquiJoinFilters += call)
+ }
+
+ val newJoinFilter = builder.and(remainFilters :+
+ FlinkRexUtil.simplify(rexBuilder, builder.and(newEquiJoinFilters)))
+ val newJoin = join.copy(
+ join.getTraitSet,
+ newJoinFilter,
+ join.getLeft,
+ join.getRight,
+ join.getJoinType,
+ join.isSemiJoinDone)
+
+ call.transformTo(newJoin)
+ }
+
+ /**
+ * Returns true if the given input ref is from join left, else false.
+ */
+ private def fromJoinLeft(join: Join, ref: RexInputRef): Boolean = {
+ require(join.getSystemFieldList.size() == 0)
+ ref.getIndex < join.getLeft.getRowType.getFieldCount
+ }
+
+ /**
+ * Partition join condition to leftRef-rightRef equals and others.
+ */
+ def partitionJoinFilters(join: Join): (Seq[RexNode], Seq[RexNode]) = {
+ val conjunctions = RelOptUtil.conjunctions(join.getCondition)
+ conjunctions.partition {
+ case call: RexCall if call.isA(SqlKind.EQUALS) =>
+ (call.operands.head, call.operands.last) match {
+ case (ref1: RexInputRef, ref2: RexInputRef) =>
+ val isLeft1 = fromJoinLeft(join, ref1)
+ val isLeft2 = fromJoinLeft(join, ref2)
+ isLeft1 != isLeft2
+ case _ => false
+ }
+ case _ => false
+ }
+ }
+
+ /**
+ * Put fields to a group that have equivalence relationships.
+ */
+ def getEquiFilterRelationshipGroup(equiJoinFilters: Seq[RexNode]): Seq[Seq[RexInputRef]] = {
+ val filterSets = mutable.ArrayBuffer[mutable.HashSet[RexInputRef]]()
+ equiJoinFilters.foreach {
+ case call: RexCall =>
+ require(call.isA(SqlKind.EQUALS))
+ val left = call.operands.head.asInstanceOf[RexInputRef]
+ val right = call.operands.last.asInstanceOf[RexInputRef]
+ val set = filterSets.find(set => set.contains(left) || set.contains(right)) match {
+ case Some(s) => s
+ case None =>
+ val s = new mutable.HashSet[RexInputRef]()
+ filterSets += s
+ s
+ }
+ set += left
+ set += right
+ }
+
+ filterSets.map(_.toSeq)
+ }
+
+ /**
+ * Make calls to a number of inputRefs, make sure that they both have a relationship.
+ */
+ def makeCalls(rexBuilder: RexBuilder, nodes: Seq[RexInputRef]): Seq[RexNode] = {
+ val calls = new mutable.ArrayBuffer[RexNode]()
+ if (nodes.length > 1) {
+ val rex = nodes.head
+ nodes.drop(1).foreach(calls += rexBuilder.makeCall(EQUALS, rex, _))
+ }
+ calls
+ }
+}
+
+object JoinConditionEqualityTransferRule {
+ val INSTANCE = new JoinConditionEqualityTransferRule
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/JoinConditionTypeCoerceRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/JoinConditionTypeCoerceRule.scala
new file mode 100644
index 0000000..8e98c15
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/JoinConditionTypeCoerceRule.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.plan.util.FlinkRexUtil
+
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptUtil}
+import org.apache.calcite.rel.`type`.RelDataTypeFactory
+import org.apache.calcite.rel.core.Join
+import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
+import org.apache.calcite.sql.SqlKind
+import org.apache.calcite.sql.`type`.SqlTypeUtil
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+
+/**
+ * Planner rule that coerces the both sides of EQUALS(`=`) operator in Join condition
+ * to the same type while sans nullability.
+ *
+ * <p>For most cases, we already did the type coercion during type validation by implicit
+ * type coercion or during sqlNode to relNode conversion, this rule just does a rechecking
+ * to ensure a strongly uniform equals type, so that during a HashJoin shuffle we can have
+ * the same hashcode of the same value.
+ */
+class JoinConditionTypeCoerceRule extends RelOptRule(
+ operand(classOf[Join], any),
+ "JoinConditionTypeCoerceRule") {
+
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val join: Join = call.rel(0)
+ if (join.getCondition.isAlwaysTrue) {
+ return false
+ }
+ val typeFactory = call.builder().getTypeFactory
+ hasEqualsRefsOfDifferentTypes(typeFactory, join.getCondition)
+ }
+
+ override def onMatch(call: RelOptRuleCall): Unit = {
+ val join: Join = call.rel(0)
+ val builder = call.builder()
+ val rexBuilder = builder.getRexBuilder
+ val typeFactory = builder.getTypeFactory
+
+ val newJoinFilters = mutable.ArrayBuffer[RexNode]()
+ val joinFilters = RelOptUtil.conjunctions(join.getCondition)
+ joinFilters.foreach {
+ case c: RexCall if c.isA(SqlKind.EQUALS) =>
+ (c.operands.head, c.operands.last) match {
+ case (ref1: RexInputRef, ref2: RexInputRef)
+ if !SqlTypeUtil.equalSansNullability(
+ typeFactory,
+ ref1.getType,
+ ref2.getType) =>
+ val refList = ref1 :: ref2 :: Nil
+ val targetType = typeFactory.leastRestrictive(refList.map(ref => ref.getType))
+ if (targetType == null) {
+ throw new TableException(
+ s"${ref1.getType} and ${ref2.getType} does not have common type now")
+ }
+ newJoinFilters += builder.equals(
+ rexBuilder.ensureType(targetType, ref1, true),
+ rexBuilder.ensureType(targetType, ref2, true))
+ case _ =>
+ newJoinFilters += c
+ }
+ case r: RexNode =>
+ newJoinFilters += r
+ }
+
+ val newCondExp = builder.and(
+ FlinkRexUtil.simplify(rexBuilder, builder.and(newJoinFilters)))
+
+ val newJoin = join.copy(
+ join.getTraitSet,
+ newCondExp,
+ join.getLeft,
+ join.getRight,
+ join.getJoinType,
+ join.isSemiJoinDone)
+
+ call.transformTo(newJoin)
+ }
+
+ /**
+ * Returns true if two input refs of an equal call have different types in join condition,
+ * else false.
+ */
+ private def hasEqualsRefsOfDifferentTypes(
+ typeFactory: RelDataTypeFactory,
+ predicate: RexNode): Boolean = {
+ val conjunctions = RelOptUtil.conjunctions(predicate)
+ conjunctions.exists {
+ case c: RexCall if c.isA(SqlKind.EQUALS) =>
+ (c.operands.head, c.operands.last) match {
+ case (ref1: RexInputRef, ref2: RexInputRef) =>
+ !SqlTypeUtil.equalSansNullability(
+ typeFactory,
+ ref1.getType,
+ ref2.getType)
+ case _ => false
+ }
+ case _ => false
+ }
+ }
+}
+
+object JoinConditionTypeCoerceRule {
+ val INSTANCE = new JoinConditionTypeCoerceRule
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/JoinDependentConditionDerivationRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/JoinDependentConditionDerivationRule.scala
new file mode 100644
index 0000000..2222991
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/JoinDependentConditionDerivationRule.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.table.plan.util.FlinkRexUtil
+
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptUtil}
+import org.apache.calcite.rel.core.{Join, JoinRelType}
+import org.apache.calcite.rel.logical.LogicalJoin
+import org.apache.calcite.rex.RexNode
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+
+/**
+ * Planner Rule that extracts some sub-conditions in the Join OR condition that can be pushed
+ * into join inputs by [[FlinkFilterJoinRule]].
+ *
+ * <p>For example, there is a join query (table A join table B):
+ * {{{
+ * SELECT * FROM A, B WHERE A.f1 = B.f1 AND ((A.f2 = 'aaa1' AND B.f2 = 'bbb1') OR
+ * (A.f2 = 'aaa2' AND B.f2 = 'bbb2'))
+ * }}}
+ *
+ * <p>Hence the query rewards optimizers that can analyze complex join conditions which cannot be
+ * pushed below the join, but still derive filters from such join conditions. It could immediately
+ * filter the scan(A) with the condition: (A.f2 = 'aaa1' OR A.f2 = 'aaa2').
+ *
+ * <p>After join condition dependent optimization, the query will be:
+ * {{{
+ * SELECT * FROM A, B WHERE A.f1 = B.f1 AND
+ * ((A.f2 = 'aaa1' AND B.f2 = 'bbb1') OR (A.f2 = 'aaa2' AND B.f2 = 'bbb2'))
+ * AND (A.f2 = 'aaa1' OR A.f2 = 'aaa2') AND (B.f2 = 'bbb1' OR B.f2 = 'bbb2')
+ * }}}.
+ *
+ * <p>Note: This class can only be used in HepPlanner with RULE_SEQUENCE.
+ */
+class JoinDependentConditionDerivationRule
+ extends RelOptRule(
+ operand(classOf[LogicalJoin], any()),
+ "JoinDependentConditionDerivationRule") {
+
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val join: LogicalJoin = call.rel(0)
+ // TODO supports more join type
+ join.getJoinType == JoinRelType.INNER
+ }
+
+ override def onMatch(call: RelOptRuleCall): Unit = {
+ val join: LogicalJoin = call.rel(0)
+ val conjunctions = RelOptUtil.conjunctions(join.getCondition)
+
+ val builder = call.builder()
+ val additionalConditions = new mutable.ArrayBuffer[RexNode]
+
+ // and
+ conjunctions.foreach { conjunctionRex =>
+ val disjunctions = RelOptUtil.disjunctions(conjunctionRex)
+
+ // Apart or: (A.f2 = 'aaa1' and B.f2 = 'bbb1') or (A.f2 = 'aaa2' and B.f2 = 'bbb2')
+ if (disjunctions.size() > 1) {
+
+ val leftDisjunctions = new mutable.ArrayBuffer[RexNode]
+ val rightDisjunctions = new mutable.ArrayBuffer[RexNode]
+ disjunctions.foreach { disjunctionRex =>
+
+ val leftConjunctions = new mutable.ArrayBuffer[RexNode]
+ val rightConjunctions = new mutable.ArrayBuffer[RexNode]
+
+ // Apart and: A.f2 = 'aaa1' and B.f2 = 'bbb1'
+ RelOptUtil.conjunctions(disjunctionRex).foreach { cond =>
+ val rCols = RelOptUtil.InputFinder.bits(cond).map(_.intValue())
+
+ // May have multi conditions, eg: A.f2 = 'aaa1' and A.f3 = 'aaa3' and B.f2 = 'bbb1'
+ if (rCols.forall(fromJoinLeft(join, _))) {
+ leftConjunctions += cond
+ } else if (rCols.forall(!fromJoinLeft(join, _))) {
+ rightConjunctions += cond
+ }
+ }
+
+ // It is true if conjunction conditions is empty.
+ leftDisjunctions += builder.and(leftConjunctions)
+ rightDisjunctions += builder.and(rightConjunctions)
+ }
+
+ // TODO Consider whether it is worth doing a filter if we have histogram.
+ if (leftDisjunctions.nonEmpty) {
+ additionalConditions += builder.or(leftDisjunctions)
+ }
+ if (rightDisjunctions.nonEmpty) {
+ additionalConditions += builder.or(rightDisjunctions)
+ }
+ }
+ }
+
+ if (additionalConditions.nonEmpty) {
+ val newCondExp = FlinkRexUtil.simplify(
+ builder.getRexBuilder,
+ builder.and(conjunctions ++ additionalConditions))
+
+ if (!newCondExp.toString.equals(join.getCondition.toString)) {
+ val newJoin = join.copy(
+ join.getTraitSet,
+ newCondExp,
+ join.getLeft,
+ join.getRight,
+ join.getJoinType,
+ join.isSemiJoinDone)
+
+ call.transformTo(newJoin)
+ }
+ }
+ }
+
+ /**
+ * Returns true if the given index is from join left, else false.
+ */
+ private def fromJoinLeft(join: Join, index: Int): Boolean = {
+ require(join.getSystemFieldList.size() == 0)
+ index < join.getLeft.getRowType.getFieldCount
+ }
+
+}
+
+object JoinDependentConditionDerivationRule {
+ val INSTANCE = new JoinDependentConditionDerivationRule
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/JoinDeriveNullFilterRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/JoinDeriveNullFilterRule.scala
new file mode 100644
index 0000000..3143bdf
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/JoinDeriveNullFilterRule.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.table.api.PlannerConfigOptions
+import org.apache.flink.table.plan.metadata.FlinkRelMetadataQuery
+import org.apache.flink.table.plan.util.FlinkRelOptUtil
+
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.{Join, JoinRelType}
+import org.apache.calcite.rel.logical.LogicalJoin
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.util.ImmutableIntList
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+
+/**
+ * Planner rule that filters null values before join if the count null value from join input
+ * is greater than [[PlannerConfigOptions.SQL_OPTIMIZER_JOIN_NULL_FILTER_THRESHOLD]].
+ *
+ * Since the key of the Null value is impossible to match in the inner join, and there is a single
+ * point skew due to too many Null values. We should push down a not-null filter into the child
+ * node of join.
+ */
+class JoinDeriveNullFilterRule
+ extends RelOptRule(
+ operand(classOf[LogicalJoin], any()),
+ "JoinDeriveNullFilterRule") {
+
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val join: Join = call.rel(0)
+ join.getJoinType == JoinRelType.INNER && join.analyzeCondition.pairs().nonEmpty
+ }
+
+ override def onMatch(call: RelOptRuleCall): Unit = {
+ val join: LogicalJoin = call.rel(0)
+
+ val rexBuilder = join.getCluster.getRexBuilder
+ val mq = FlinkRelMetadataQuery.reuseOrCreate(join.getCluster.getMetadataQuery)
+ val conf = FlinkRelOptUtil.getTableConfigFromContext(join)
+ val minNullCount = conf.getConf.getLong(
+ PlannerConfigOptions.SQL_OPTIMIZER_JOIN_NULL_FILTER_THRESHOLD)
+
+ def createIsNotNullFilter(input: RelNode, keys: ImmutableIntList): RelNode = {
+ val relBuilder = call.builder()
+ val filters = new mutable.ArrayBuffer[RexNode]
+ keys.foreach { key =>
+ val nullCount = mq.getColumnNullCount(input, key)
+ if (nullCount != null && nullCount > minNullCount) {
+ filters += relBuilder.call(
+ SqlStdOperatorTable.IS_NOT_NULL, rexBuilder.makeInputRef(input, key))
+ }
+ }
+ if (filters.nonEmpty) {
+ relBuilder.push(input).filter(filters).build()
+ } else {
+ input
+ }
+ }
+
+ val joinInfo = join.analyzeCondition
+ val newLeft = createIsNotNullFilter(join.getLeft, joinInfo.leftKeys)
+ val newRight = createIsNotNullFilter(join.getRight, joinInfo.rightKeys)
+
+ if ((newLeft ne join.getLeft) || (newRight ne join.getRight)) {
+ val newJoin = join.copy(join.getTraitSet, Seq(newLeft, newRight))
+ call.transformTo(newJoin)
+ }
+ }
+}
+
+object JoinDeriveNullFilterRule {
+ val INSTANCE = new JoinDeriveNullFilterRule
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/SimplifyJoinConditionRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/SimplifyJoinConditionRule.scala
new file mode 100644
index 0000000..2eff11a
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/SimplifyJoinConditionRule.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.table.plan.util.FlinkRexUtil
+
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.logical.LogicalJoin
+import org.apache.calcite.rex._
+
+/**
+ * Planner rule that apply various simplifying transformations on join condition.
+ * e.g.
+ * reduce same expressions: a=b AND b=a -> a=b,
+ * simplify boolean expressions: x = 1 AND FALSE -> FALSE,
+ * simplify cast expressions: CAST('123' as integer) -> 123
+ */
+class SimplifyJoinConditionRule
+ extends RelOptRule(
+ operand(classOf[LogicalJoin], any()),
+ "SimplifyJoinConditionRule") {
+
+ override def onMatch(call: RelOptRuleCall): Unit = {
+ val join: LogicalJoin = call.rel(0)
+ val condition = join.getCondition
+
+ if (join.getCondition.isAlwaysTrue) {
+ return
+ }
+
+ val simpleCondExp = FlinkRexUtil.simplify(join.getCluster.getRexBuilder, condition)
+ val newCondExp = RexUtil.pullFactors(join.getCluster.getRexBuilder, simpleCondExp)
+
+ if (newCondExp.toString.equals(condition.toString)) {
+ return
+ }
+
+ val newJoin = join.copy(
+ join.getTraitSet,
+ newCondExp,
+ join.getLeft,
+ join.getRight,
+ join.getJoinType,
+ join.isSemiJoinDone)
+
+ call.transformTo(newJoin)
+ call.getPlanner.setImportance(join, 0.0)
+ }
+}
+
+object SimplifyJoinConditionRule {
+ val INSTANCE = new SimplifyJoinConditionRule
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/BroadcastHashSemiAntiJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/BroadcastHashSemiAntiJoinTest.xml
index 789b29d..a2bb487 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/BroadcastHashSemiAntiJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/BroadcastHashSemiAntiJoinTest.xml
@@ -863,13 +863,12 @@ LogicalProject(d=[$0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a, b, c])
-+- HashJoin(joinType=[LeftSemiJoin], where=[AND(=(a, d), =(b0, d))], select=[a, b, c, b0], isBroadcast=[true], build=[right])
- :- Calc(select=[a, b, c, CAST(b) AS b0])
- : +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
- +- Exchange(distribution=[broadcast])
- +- Calc(select=[d])
- +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+HashJoin(joinType=[LeftSemiJoin], where=[=(a, d)], select=[a, b, c], isBroadcast=[true], build=[right])
+:- Calc(select=[a, b, c], where=[=(a, CAST(b))])
+: +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
++- Exchange(distribution=[broadcast])
+ +- Calc(select=[d])
+ +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/NestedLoopJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/NestedLoopJoinTest.xml
index a0d0dc3..aefff81 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/NestedLoopJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/NestedLoopJoinTest.xml
@@ -179,6 +179,33 @@ NestedLoopJoin(joinType=[FullOuterJoin], where=[<>(a, d)], select=[d, e, f, g, h
]]>
</Resource>
</TestCase>
+ <TestCase name="testFullOuterWithUsing">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM (SELECT * FROM MyTable1) FULL JOIN (SELECT * FROM MyTable3) USING (a)
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[COALESCE($0, $3)], b=[$1], c=[$2], b0=[$4], c0=[$5])
++- LogicalJoin(condition=[=($0, $3)], joinType=[full])
+ :- LogicalProject(a=[$0], b=[$1], c=[$2])
+ : +- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2])
+ +- LogicalTableScan(table=[[MyTable3, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[CASE(IS NOT NULL(a), a, a0) AS $f0, b, c, b0, c0])
++- NestedLoopJoin(joinType=[FullOuterJoin], where=[=(a, a0)], select=[a, b, c, a0, b0, c0], build=[left])
+ :- Exchange(distribution=[single])
+ : +- TableSourceScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- Exchange(distribution=[single])
+ +- TableSourceScan(table=[[MyTable3, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testInnerJoinWithEquiAndLocalPred">
<Resource name="sql">
<![CDATA[SELECT c, g FROM MyTable2 INNER JOIN MyTable1 ON a = d AND d < 2]]>
@@ -371,31 +398,6 @@ NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[a, d], build=[left])
]]>
</Resource>
</TestCase>
- <TestCase name="testJoinNonMatchingKeyTypes">
- <Resource name="sql">
- <![CDATA[SELECT c, g FROM MyTable1, MyTable2 WHERE a = g]]>
- </Resource>
- <Resource name="planBefore">
- <![CDATA[
-LogicalProject(c=[$2], g=[$6])
-+- LogicalFilter(condition=[=($0, $6)])
- +- LogicalJoin(condition=[true], joinType=[inner])
- :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
- +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
-]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-Calc(select=[c, g])
-+- NestedLoopJoin(joinType=[InnerJoin], where=[=(a, g)], select=[a, c, g], build=[right])
- :- Calc(select=[a, c])
- : +- TableSourceScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
- +- Exchange(distribution=[broadcast])
- +- Calc(select=[g])
- +- TableSourceScan(table=[[MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
-]]>
- </Resource>
- </TestCase>
<TestCase name="testLeftOuterJoinNoEquiPred">
<Resource name="sql">
<![CDATA[SELECT * FROM MyTable2 LEFT OUTER JOIN MyTable1 ON a <> d]]>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/NestedLoopSemiAntiJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/NestedLoopSemiAntiJoinTest.xml
index 09a0420..f825d25 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/NestedLoopSemiAntiJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/NestedLoopSemiAntiJoinTest.xml
@@ -1053,13 +1053,12 @@ LogicalProject(d=[$0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a, b, c])
-+- NestedLoopJoin(joinType=[LeftSemiJoin], where=[AND(=(a, d), =(b0, d))], select=[a, b, c, b0], build=[right])
- :- Calc(select=[a, b, c, CAST(b) AS b0])
- : +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
- +- Exchange(distribution=[broadcast])
- +- Calc(select=[d])
- +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+NestedLoopJoin(joinType=[LeftSemiJoin], where=[=(a, d)], select=[a, b, c], build=[right])
+:- Calc(select=[a, b, c], where=[=(a, CAST(b))])
+: +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
++- Exchange(distribution=[broadcast])
+ +- Calc(select=[d])
+ +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
]]>
</Resource>
</TestCase>
@@ -2011,13 +2010,14 @@ LogicalFilter(condition=[<($cor0.a, 10)])
</Resource>
<Resource name="planAfter">
<![CDATA[
-NestedLoopJoin(joinType=[LeftAntiJoin], where=[true], select=[a, b, c], build=[right])
-:- Calc(select=[a, b, c], where=[<(a, 10)])
-: +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-+- Exchange(distribution=[broadcast])
- +- LocalHashAggregate(select=[])
- +- Calc(select=[])
- +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+Calc(select=[a, b, c])
++- NestedLoopJoin(joinType=[LeftAntiJoin], where=[$f3], select=[a, b, c, $f3], build=[right])
+ :- Calc(select=[a, b, c, <(a, 10) AS $f3])
+ : +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- Exchange(distribution=[broadcast])
+ +- LocalHashAggregate(select=[])
+ +- Calc(select=[])
+ +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
]]>
</Resource>
</TestCase>
@@ -2348,12 +2348,13 @@ LogicalProject(d=[$0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-NestedLoopJoin(joinType=[LeftAntiJoin], where=[OR(=(a, d), IS NULL(a), IS NULL(d))], select=[a, b, c], build=[right])
-:- Calc(select=[a, b, c], where=[>(b, 10)])
-: +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-+- Exchange(distribution=[broadcast])
- +- Calc(select=[d])
- +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+Calc(select=[a, b, c])
++- NestedLoopJoin(joinType=[LeftAntiJoin], where=[AND(OR(=(a, d), IS NULL(a), IS NULL(d)), $f3)], select=[a, b, c, $f3], build=[right])
+ :- Calc(select=[a, b, c, >(b, 10) AS $f3])
+ : +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- Exchange(distribution=[broadcast])
+ +- Calc(select=[d])
+ +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/SemiAntiJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/SemiAntiJoinTest.xml
index e8efee0..117d880 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/SemiAntiJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/SemiAntiJoinTest.xml
@@ -1082,14 +1082,13 @@ LogicalProject(d=[$0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a, b, c])
-+- HashJoin(joinType=[LeftSemiJoin], where=[AND(=(a, d), =(b0, d))], select=[a, b, c, b0], build=[right])
- :- Exchange(distribution=[hash[a, b0]])
- : +- Calc(select=[a, b, c, CAST(b) AS b0])
- : +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
- +- Exchange(distribution=[hash[d, d]])
- +- Calc(select=[d])
- +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+HashJoin(joinType=[LeftSemiJoin], where=[=(a, d)], select=[a, b, c], build=[left])
+:- Exchange(distribution=[hash[a]])
+: +- Calc(select=[a, b, c], where=[=(a, CAST(b))])
+: +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
++- Exchange(distribution=[hash[d]])
+ +- Calc(select=[d])
+ +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
]]>
</Resource>
</TestCase>
@@ -2073,13 +2072,14 @@ LogicalFilter(condition=[<($cor0.a, 10)])
</Resource>
<Resource name="planAfter">
<![CDATA[
-NestedLoopJoin(joinType=[LeftAntiJoin], where=[true], select=[a, b, c], build=[right])
-:- Calc(select=[a, b, c], where=[<(a, 10)])
-: +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-+- Exchange(distribution=[broadcast])
- +- LocalHashAggregate(select=[])
- +- Calc(select=[])
- +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+Calc(select=[a, b, c])
++- NestedLoopJoin(joinType=[LeftAntiJoin], where=[$f3], select=[a, b, c, $f3], build=[right])
+ :- Calc(select=[a, b, c, <(a, 10) AS $f3])
+ : +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- Exchange(distribution=[broadcast])
+ +- LocalHashAggregate(select=[])
+ +- Calc(select=[])
+ +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
]]>
</Resource>
</TestCase>
@@ -2419,12 +2419,13 @@ LogicalProject(d=[$0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-NestedLoopJoin(joinType=[LeftAntiJoin], where=[OR(=(a, d), IS NULL(a), IS NULL(d))], select=[a, b, c], build=[right])
-:- Calc(select=[a, b, c], where=[>(b, 10)])
-: +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-+- Exchange(distribution=[broadcast])
- +- Calc(select=[d])
- +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+Calc(select=[a, b, c])
++- NestedLoopJoin(joinType=[LeftAntiJoin], where=[AND(OR(=(a, d), IS NULL(a), IS NULL(d)), $f3)], select=[a, b, c, $f3], build=[right])
+ :- Calc(select=[a, b, c, >(b, 10) AS $f3])
+ : +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- Exchange(distribution=[broadcast])
+ +- Calc(select=[d])
+ +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/ShuffledHashJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/ShuffledHashJoinTest.xml
index 4be436f..c687cc5 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/ShuffledHashJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/ShuffledHashJoinTest.xml
@@ -92,6 +92,33 @@ Calc(select=[c, g])
]]>
</Resource>
</TestCase>
+ <TestCase name="testFullOuterWithUsing">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM (SELECT * FROM MyTable1) FULL JOIN (SELECT * FROM MyTable3) USING (a)
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[COALESCE($0, $3)], b=[$1], c=[$2], b0=[$4], c0=[$5])
++- LogicalJoin(condition=[=($0, $3)], joinType=[full])
+ :- LogicalProject(a=[$0], b=[$1], c=[$2])
+ : +- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2])
+ +- LogicalTableScan(table=[[MyTable3, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[CASE(IS NOT NULL(a), a, a0) AS $f0, b, c, b0, c0])
++- HashJoin(joinType=[FullOuterJoin], where=[=(a, a0)], select=[a, b, c, a0, b0, c0], build=[right])
+ :- Exchange(distribution=[hash[a]])
+ : +- TableSourceScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- Exchange(distribution=[hash[a]])
+ +- TableSourceScan(table=[[MyTable3, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testInnerJoinWithEquiAndLocalPred">
<Resource name="sql">
<![CDATA[SELECT c, g FROM MyTable2 INNER JOIN MyTable1 ON a = d AND d < 2]]>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/ShuffledHashSemiAntiJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/ShuffledHashSemiAntiJoinTest.xml
index bd57fc8..9612610 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/ShuffledHashSemiAntiJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/ShuffledHashSemiAntiJoinTest.xml
@@ -872,14 +872,13 @@ LogicalProject(d=[$0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a, b, c])
-+- HashJoin(joinType=[LeftSemiJoin], where=[AND(=(a, d), =(b0, d))], select=[a, b, c, b0], build=[right])
- :- Exchange(distribution=[hash[a, b0]])
- : +- Calc(select=[a, b, c, CAST(b) AS b0])
- : +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
- +- Exchange(distribution=[hash[d, d]])
- +- Calc(select=[d])
- +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+HashJoin(joinType=[LeftSemiJoin], where=[=(a, d)], select=[a, b, c], build=[left])
+:- Exchange(distribution=[hash[a]])
+: +- Calc(select=[a, b, c], where=[=(a, CAST(b))])
+: +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
++- Exchange(distribution=[hash[d]])
+ +- Calc(select=[d])
+ +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/SortMergeJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/SortMergeJoinTest.xml
index d21ac60..51724cb 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/SortMergeJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/SortMergeJoinTest.xml
@@ -92,6 +92,33 @@ Calc(select=[c, g])
]]>
</Resource>
</TestCase>
+ <TestCase name="testFullOuterWithUsing">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM (SELECT * FROM MyTable1) FULL JOIN (SELECT * FROM MyTable3) USING (a)
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[COALESCE($0, $3)], b=[$1], c=[$2], b0=[$4], c0=[$5])
++- LogicalJoin(condition=[=($0, $3)], joinType=[full])
+ :- LogicalProject(a=[$0], b=[$1], c=[$2])
+ : +- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2])
+ +- LogicalTableScan(table=[[MyTable3, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[CASE(IS NOT NULL(a), a, a0) AS $f0, b, c, b0, c0])
++- SortMergeJoin(joinType=[FullOuterJoin], where=[=(a, a0)], select=[a, b, c, a0, b0, c0])
+ :- Exchange(distribution=[hash[a]])
+ : +- TableSourceScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- Exchange(distribution=[hash[a]])
+ +- TableSourceScan(table=[[MyTable3, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testInnerJoinWithEquiAndLocalPred">
<Resource name="sql">
<![CDATA[SELECT c, g FROM MyTable2 INNER JOIN MyTable1 ON a = d AND d < 2]]>
@@ -268,32 +295,6 @@ Calc(select=[a, d])
]]>
</Resource>
</TestCase>
- <TestCase name="testJoinNonMatchingKeyTypes">
- <Resource name="sql">
- <![CDATA[SELECT c, g FROM MyTable1, MyTable2 WHERE a = g]]>
- </Resource>
- <Resource name="planBefore">
- <![CDATA[
-LogicalProject(c=[$2], g=[$6])
-+- LogicalFilter(condition=[=($0, $6)])
- +- LogicalJoin(condition=[true], joinType=[inner])
- :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
- +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
-]]>
- </Resource>
- <Resource name="planAfter">
- <![CDATA[
-Calc(select=[c, g])
-+- SortMergeJoin(joinType=[InnerJoin], where=[=(a, g)], select=[a, c, g])
- :- Exchange(distribution=[hash[a]])
- : +- Calc(select=[a, c])
- : +- TableSourceScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
- +- Exchange(distribution=[hash[g]])
- +- Calc(select=[g])
- +- TableSourceScan(table=[[MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
-]]>
- </Resource>
- </TestCase>
<TestCase name="testLeftOuterJoinWithEquiAndLocalPred">
<Resource name="sql">
<![CDATA[SELECT c, g FROM MyTable2 LEFT OUTER JOIN MyTable1 ON a = d AND d < 2]]>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/SortMergeSemiAntiJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/SortMergeSemiAntiJoinTest.xml
index cba2428..7e427ac 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/SortMergeSemiAntiJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/SortMergeSemiAntiJoinTest.xml
@@ -929,14 +929,13 @@ LogicalProject(d=[$0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a, b, c])
-+- SortMergeJoin(joinType=[LeftSemiJoin], where=[AND(=(a, d), =(b0, d))], select=[a, b, c, b0])
- :- Exchange(distribution=[hash[a, b0]])
- : +- Calc(select=[a, b, c, CAST(b) AS b0])
- : +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
- +- Exchange(distribution=[hash[d, d]])
- +- Calc(select=[d])
- +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+SortMergeJoin(joinType=[LeftSemiJoin], where=[=(a, d)], select=[a, b, c])
+:- Exchange(distribution=[hash[a]])
+: +- Calc(select=[a, b, c], where=[=(a, CAST(b))])
+: +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
++- Exchange(distribution=[hash[d]])
+ +- Calc(select=[d])
+ +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/FlinkFilterJoinRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/FlinkFilterJoinRuleTest.xml
new file mode 100644
index 0000000..34f742f
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/FlinkFilterJoinRuleTest.xml
@@ -0,0 +1,435 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testFilterPushDownLeftAnti1">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM (SELECT * FROM leftT WHERE a NOT IN
+ (SELECT c FROM rightT WHERE c < 3)) T WHERE T.b > 2
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalFilter(condition=[>($1, 2)])
+ +- LogicalProject(a=[$0], b=[$1])
+ +- LogicalFilter(condition=[NOT(IN($0, {
+LogicalProject(c=[$0])
+ LogicalFilter(condition=[<($0, 3)])
+ LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+}))])
+ +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalProject(a=[$0], b=[$1])
+ +- LogicalJoin(condition=[OR(=($0, $2), IS NULL($0), IS NULL($2))], joinType=[anti])
+ :- LogicalFilter(condition=[>($1, 2)])
+ : +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+ +- LogicalProject(c=[$0])
+ +- LogicalFilter(condition=[<($0, 3)])
+ +- LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testFilterPushDownLeftAnti2">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM (SELECT * FROM leftT WHERE NOT EXISTS
+ (SELECT * FROM rightT where c > 10)) T WHERE T.b > 2
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalFilter(condition=[>($1, 2)])
+ +- LogicalProject(a=[$0], b=[$1])
+ +- LogicalFilter(condition=[NOT(EXISTS({
+LogicalFilter(condition=[>($0, 10)])
+ LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+}))])
+ +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalProject(a=[$0], b=[$1])
+ +- LogicalJoin(condition=[$2], joinType=[anti])
+ :- LogicalFilter(condition=[>($1, 2)])
+ : +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+ +- LogicalProject($f0=[IS NOT NULL($0)])
+ +- LogicalAggregate(group=[{}], m=[MIN($0)])
+ +- LogicalProject(i=[true])
+ +- LogicalFilter(condition=[>($0, 10)])
+ +- LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testFilterPushDownLeftAnti3">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM (SELECT * FROM leftT WHERE a NOT IN
+(SELECT c FROM rightT WHERE b = d AND c < 3)) T WHERE T.b > 2
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalFilter(condition=[>($1, 2)])
+ +- LogicalProject(a=[$0], b=[$1])
+ +- LogicalFilter(condition=[NOT(IN($0, {
+LogicalProject(c=[$0])
+ LogicalFilter(condition=[AND(=($cor0.b, $1), <($0, 3))])
+ LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+}))], variablesSet=[[$cor0]])
+ +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalProject(a=[$0], b=[$1])
+ +- LogicalJoin(condition=[AND(OR(=($0, $2), IS NULL($0), IS NULL($2)), =($1, $3))], joinType=[anti])
+ :- LogicalFilter(condition=[>($1, 2)])
+ : +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+ +- LogicalProject(c=[$0], d=[$1])
+ +- LogicalFilter(condition=[<($0, 3)])
+ +- LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testFilterPushDownLeftAnti4">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM (SELECT * FROM leftT WHERE NOT EXISTS
+ (SELECT * FROM rightT WHERE a = c)) T WHERE T.b > 2
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalFilter(condition=[>($1, 2)])
+ +- LogicalProject(a=[$0], b=[$1])
+ +- LogicalFilter(condition=[NOT(EXISTS({
+LogicalFilter(condition=[=($cor0.a, $0)])
+ LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+}))], variablesSet=[[$cor0]])
+ +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalProject(a=[$0], b=[$1])
+ +- LogicalJoin(condition=[=($0, $2)], joinType=[anti])
+ :- LogicalFilter(condition=[>($1, 2)])
+ : +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+ +- LogicalProject(c=[$0])
+ +- LogicalFilter(condition=[true])
+ +- LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testFilterPushDownLeftSemi1">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM (SELECT * FROM leftT WHERE a IN (SELECT c FROM rightT)) T WHERE T.b > 2]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalFilter(condition=[>($1, 2)])
+ +- LogicalProject(a=[$0], b=[$1])
+ +- LogicalFilter(condition=[IN($0, {
+LogicalProject(c=[$0])
+ LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+})])
+ +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalProject(a=[$0], b=[$1])
+ +- LogicalJoin(condition=[=($0, $2)], joinType=[semi])
+ :- LogicalFilter(condition=[>($1, 2)])
+ : +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+ +- LogicalProject(c=[$0])
+ +- LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testFilterPushDownLeftSemi2">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM (SELECT * FROM leftT WHERE EXISTS (SELECT * FROM rightT)) T WHERE T.b > 2]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalFilter(condition=[>($1, 2)])
+ +- LogicalProject(a=[$0], b=[$1])
+ +- LogicalFilter(condition=[EXISTS({
+LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+})])
+ +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalProject(a=[$0], b=[$1])
+ +- LogicalJoin(condition=[$2], joinType=[semi])
+ :- LogicalFilter(condition=[>($1, 2)])
+ : +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+ +- LogicalProject($f0=[IS NOT NULL($0)])
+ +- LogicalAggregate(group=[{}], m=[MIN($0)])
+ +- LogicalProject(i=[true])
+ +- LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testFilterPushDownLeftSemi3">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM (SELECT * FROM leftT WHERE EXISTS
+ (SELECT * FROM rightT WHERE a = c)) T WHERE T.b > 2
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalFilter(condition=[>($1, 2)])
+ +- LogicalProject(a=[$0], b=[$1])
+ +- LogicalFilter(condition=[EXISTS({
+LogicalFilter(condition=[=($cor0.a, $0)])
+ LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+})], variablesSet=[[$cor0]])
+ +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalProject(a=[$0], b=[$1])
+ +- LogicalJoin(condition=[=($0, $2)], joinType=[semi])
+ :- LogicalFilter(condition=[>($1, 2)])
+ : +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+ +- LogicalProject(c=[$0])
+ +- LogicalFilter(condition=[true])
+ +- LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testJoinConditionPushDownLeftAnti1">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM leftT WHERE a NOT IN (SELECT c FROM rightT WHERE b > 2)]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalFilter(condition=[NOT(IN($0, {
+LogicalProject(c=[$0])
+ LogicalFilter(condition=[>($cor0.b, 2)])
+ LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+}))], variablesSet=[[$cor0]])
+ +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalProject(a=[$0], b=[$1])
+ +- LogicalJoin(condition=[AND(OR(=($0, $3), IS NULL($0), IS NULL($3)), $2)], joinType=[anti])
+ :- LogicalProject(a=[$0], b=[$1], $f2=[>($1, 2)])
+ : +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+ +- LogicalProject(c=[$0])
+ +- LogicalFilter(condition=[true])
+ +- LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testJoinConditionPushDownLeftAnti2">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM leftT WHERE NOT EXISTS (SELECT * FROM rightT WHERE b > 2)]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalFilter(condition=[NOT(EXISTS({
+LogicalFilter(condition=[>($cor0.b, 2)])
+ LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+}))], variablesSet=[[$cor0]])
+ +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalProject(a=[$0], b=[$1])
+ +- LogicalJoin(condition=[$2], joinType=[anti])
+ :- LogicalProject(a=[$0], b=[$1], $f2=[>($1, 2)])
+ : +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+ +- LogicalProject
+ +- LogicalFilter(condition=[true])
+ +- LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testJoinConditionPushDownLeftAnti3">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM leftT WHERE a NOT IN (SELECT c FROM rightT WHERE b = d AND b > 1)]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalFilter(condition=[NOT(IN($0, {
+LogicalProject(c=[$0])
+ LogicalFilter(condition=[AND(=($cor0.b, $1), >($cor0.b, 1))])
+ LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+}))], variablesSet=[[$cor0]])
+ +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalProject(a=[$0], b=[$1])
+ +- LogicalJoin(condition=[AND(OR(=($0, $3), IS NULL($0), IS NULL($3)), =($1, $4), $2)], joinType=[anti])
+ :- LogicalProject(a=[$0], b=[$1], $f2=[>($1, 1)])
+ : +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+ +- LogicalProject(c=[$0], d=[$1])
+ +- LogicalFilter(condition=[true])
+ +- LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testJoinConditionPushDownLeftAnti4">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM leftT WHERE NOT EXISTS (SELECT * FROM rightT WHERE a = c AND b > 2)]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalFilter(condition=[NOT(EXISTS({
+LogicalFilter(condition=[AND(=($cor0.a, $0), >($cor0.b, 2))])
+ LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+}))], variablesSet=[[$cor0]])
+ +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalProject(a=[$0], b=[$1])
+ +- LogicalJoin(condition=[AND(=($0, $3), $2)], joinType=[anti])
+ :- LogicalProject(a=[$0], b=[$1], $f2=[>($1, 2)])
+ : +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+ +- LogicalProject(c=[$0])
+ +- LogicalFilter(condition=[true])
+ +- LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testJoinConditionPushDownLeftSemi1">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM leftT WHERE a IN (SELECT c FROM rightT WHERE b > 2)]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(c=[$0])
+ LogicalFilter(condition=[>($cor0.b, 2)])
+ LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+})], variablesSet=[[$cor0]])
+ +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalProject(a=[$0], b=[$1])
+ +- LogicalJoin(condition=[=($0, $3)], joinType=[semi])
+ :- LogicalProject(a=[$0], b=[$1], $f2=[>($1, 2)])
+ : +- LogicalFilter(condition=[>($1, 2)])
+ : +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+ +- LogicalProject(c=[$0])
+ +- LogicalFilter(condition=[true])
+ +- LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testJoinConditionPushDownLeftSemi2">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM leftT WHERE EXISTS (SELECT * FROM rightT WHERE b > 2)]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalFilter(condition=[EXISTS({
+LogicalFilter(condition=[>($cor0.b, 2)])
+ LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+})], variablesSet=[[$cor0]])
+ +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalProject(a=[$0], b=[$1])
+ +- LogicalJoin(condition=[true], joinType=[semi])
+ :- LogicalProject(a=[$0], b=[$1], $f2=[>($1, 2)])
+ : +- LogicalFilter(condition=[>($1, 2)])
+ : +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+ +- LogicalProject
+ +- LogicalFilter(condition=[true])
+ +- LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testJoinConditionPushDownLeftSemi3">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM leftT WHERE EXISTS (SELECT * FROM rightT WHERE a = c AND b > 2)]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalFilter(condition=[EXISTS({
+LogicalFilter(condition=[AND(=($cor0.a, $0), >($cor0.b, 2))])
+ LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+})], variablesSet=[[$cor0]])
+ +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalProject(a=[$0], b=[$1])
+ +- LogicalJoin(condition=[=($0, $3)], joinType=[semi])
+ :- LogicalProject(a=[$0], b=[$1], $f2=[>($1, 2)])
+ : +- LogicalFilter(condition=[>($1, 2)])
+ : +- LogicalTableScan(table=[[leftT, source: [TestTableSource(a, b)]]])
+ +- LogicalProject(c=[$0])
+ +- LogicalFilter(condition=[true])
+ +- LogicalTableScan(table=[[rightT, source: [TestTableSource(c, d)]]])
+]]>
+ </Resource>
+ </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/JoinConditionEqualityTransferRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/JoinConditionEqualityTransferRuleTest.xml
new file mode 100644
index 0000000..04b5bb2
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/JoinConditionEqualityTransferRuleTest.xml
@@ -0,0 +1,361 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testInnerJoin1">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable1 JOIN MyTable2 ON a = d AND a = e]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalJoin(condition=[AND(=($0, $3), =($0, $4))], joinType=[inner])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalJoin(condition=[AND(=($3, $4), =($0, $3))], joinType=[inner])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInnerJoin2">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable1 JOIN MyTable2 ON a = d AND a = e AND b = d]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalJoin(condition=[AND(=($0, $3), =($0, $4), =($1, $3))], joinType=[inner])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalJoin(condition=[AND(=($0, $1), =($3, $4), =($0, $3))], joinType=[inner])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInnerJoin3">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable1 JOIN MyTable2 ON a = d AND a = e AND a = c]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalJoin(condition=[AND(=($0, $3), =($0, $4), =($0, $2))], joinType=[inner])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalJoin(condition=[AND(=($0, $2), =($3, $4), =($0, $3))], joinType=[inner])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInnerJoin4">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable1 JOIN MyTable2 ON a = d AND a = e AND b + 1 = d]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$4], e=[$5], f=[$6])
++- LogicalJoin(condition=[AND(=($0, $4), =($0, $5), =($3, $4))], joinType=[inner])
+ :- LogicalProject(a=[$0], b=[$1], c=[$2], $f3=[+($1, 1)])
+ : +- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$4], e=[$5], f=[$6])
++- LogicalJoin(condition=[AND(=($0, $3), =($5, $4), =($0, $5))], joinType=[inner])
+ :- LogicalProject(a=[$0], b=[$1], c=[$2], $f3=[+($1, 1)])
+ : +- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInnerJoinWithNonEquiCondition1">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable1 JOIN MyTable2 ON a = d AND a > e]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalJoin(condition=[AND(=($0, $3), >($0, $4))], joinType=[inner])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalJoin(condition=[AND(=($0, $3), >($0, $4))], joinType=[inner])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInnerJoinWithNonEquiCondition2">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable1 JOIN MyTable2 ON a = d AND a = e AND b > d]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalJoin(condition=[AND(=($0, $3), =($0, $4), >($1, $3))], joinType=[inner])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5])
++- LogicalJoin(condition=[AND(>($1, $3), =($3, $4), =($0, $3))], joinType=[inner])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSemiJoin_Exist1">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM MyTable1 WHERE EXISTS (SELECT * FROM MyTable2 WHERE a = d AND a = e)
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[EXISTS({
+LogicalFilter(condition=[AND(=($cor0.a, $0), =($cor0.a, $1))])
+ LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+})], variablesSet=[[$cor0]])
+ +- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[AND(=($3, $4), =($0, $3))], joinType=[semi])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalProject(d=[$0], e=[$1])
+ +- LogicalFilter(condition=[true])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSemiJoin_Exist2">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM MyTable1 WHERE EXISTS
+ (SELECT * FROM MyTable2 WHERE a = d AND a = e AND b = d)
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[EXISTS({
+LogicalFilter(condition=[AND(=($cor0.a, $0), =($cor0.a, $1), =($cor0.b, $0))])
+ LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+})], variablesSet=[[$cor0]])
+ +- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[AND(=($0, $1), =($3, $4), =($0, $3))], joinType=[semi])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalProject(d=[$0], e=[$1])
+ +- LogicalFilter(condition=[true])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSemiJoin_In1">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable1 WHERE a IN (SELECT d FROM MyTable2 WHERE a = e)]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(d=[$0])
+ LogicalFilter(condition=[=($cor0.a, $1)])
+ LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+})], variablesSet=[[$cor0]])
+ +- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[AND(=($3, $4), =($0, $3))], joinType=[semi])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalProject(d=[$0], e=[$1])
+ +- LogicalFilter(condition=[true])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSemiJoin_In2">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable1 WHERE a IN (SELECT d FROM MyTable2 WHERE a = e AND b = d)]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(d=[$0])
+ LogicalFilter(condition=[AND(=($cor0.a, $1), =($cor0.b, $0))])
+ LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+})], variablesSet=[[$cor0]])
+ +- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[AND(=($0, $1), =($3, $4), =($0, $3))], joinType=[semi])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalProject(d=[$0], e=[$1])
+ +- LogicalFilter(condition=[true])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSemiJoinWithNonEquiCondition_Exist1">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM MyTable1 WHERE EXISTS (SELECT * FROM MyTable2 WHERE a = d AND a > e)
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[EXISTS({
+LogicalFilter(condition=[AND(=($cor0.a, $0), >($cor0.a, $1))])
+ LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+})], variablesSet=[[$cor0]])
+ +- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[AND(=($0, $3), >($0, $4))], joinType=[semi])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalProject(d=[$0], e=[$1])
+ +- LogicalFilter(condition=[true])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSemiJoinWithNonEquiCondition_Exist2">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM MyTable1 WHERE EXISTS
+ (SELECT * FROM MyTable2 WHERE a = d AND a = e AND b > d)
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[EXISTS({
+LogicalFilter(condition=[AND(=($cor0.a, $0), =($cor0.a, $1), >($cor0.b, $0))])
+ LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+})], variablesSet=[[$cor0]])
+ +- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[AND(>($1, $3), =($3, $4), =($0, $3))], joinType=[semi])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalProject(d=[$0], e=[$1])
+ +- LogicalFilter(condition=[true])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSemiJoinWithNonEquiCondition_In1">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable1 WHERE a IN (SELECT d FROM MyTable2 WHERE a > e)]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(d=[$0])
+ LogicalFilter(condition=[>($cor0.a, $1)])
+ LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+})], variablesSet=[[$cor0]])
+ +- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[AND(=($0, $3), >($0, $4))], joinType=[semi])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalProject(d=[$0], e=[$1])
+ +- LogicalFilter(condition=[true])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSemiJoinWithNonEquiCondition_In2">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable1 WHERE a IN (SELECT d FROM MyTable2 WHERE a > e AND b = d)]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(d=[$0])
+ LogicalFilter(condition=[AND(>($cor0.a, $1), =($cor0.b, $0))])
+ LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+})], variablesSet=[[$cor0]])
+ +- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalJoin(condition=[AND(>($0, $4), =($0, $1), =($0, $3))], joinType=[semi])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalProject(d=[$0], e=[$1])
+ +- LogicalFilter(condition=[true])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/JoinConditionTypeCoerceRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/JoinConditionTypeCoerceRuleTest.xml
new file mode 100644
index 0000000..9dfc80e
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/JoinConditionTypeCoerceRuleTest.xml
@@ -0,0 +1,266 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testInToSemiJoinDoubleEqualsDecimal">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM T1 WHERE T1.d IN (SELECT e FROM T2)]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
++- LogicalFilter(condition=[IN($3, {
+LogicalProject(e=[$4])
+ LogicalTableScan(table=[[T2, source: [TestTableSource(a, b, c, d, e)]]])
+})]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+ +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
++- LogicalJoin(condition=[=($3, $6)], joinType=[semi]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+ :- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+ +- LogicalProject(e=[$0], e0=[CAST($0):DOUBLE]), rowType=[RecordType(DECIMAL(38, 18) e, DOUBLE e0)]
+ +- LogicalProject(e=[$4]), rowType=[RecordType(DECIMAL(38, 18) e)]
+ +- LogicalTableScan(table=[[T2, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInToSemiJoinFloatEqualsDecimal">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM T1 WHERE T1.c IN (SELECT e FROM T2)]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
++- LogicalFilter(condition=[IN($2, {
+LogicalProject(e=[$4])
+ LogicalTableScan(table=[[T2, source: [TestTableSource(a, b, c, d, e)]]])
+})]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+ +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
++- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+ +- LogicalJoin(condition=[=($5, $7)], joinType=[semi]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e, DOUBLE c0)]
+ :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], c0=[CAST($2):DOUBLE]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e, DOUBLE c0)]
+ : +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+ +- LogicalProject(e=[$0], e0=[CAST($0):DOUBLE]), rowType=[RecordType(DECIMAL(38, 18) e, DOUBLE e0)]
+ +- LogicalProject(e=[$4]), rowType=[RecordType(DECIMAL(38, 18) e)]
+ +- LogicalTableScan(table=[[T2, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInToSemiJoinIntEqualsDecimal">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM T1 WHERE T1.a IN (SELECT e FROM T2)]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(e=[$4])
+ LogicalTableScan(table=[[T2, source: [TestTableSource(a, b, c, d, e)]]])
+})]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+ +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
++- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+ +- LogicalJoin(condition=[=($5, $6)], joinType=[semi]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e, DECIMAL(38, 18) a0)]
+ :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], a0=[CAST($0):DECIMAL(38, 18)]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e, DECIMAL(38, 18) a0)]
+ : +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+ +- LogicalProject(e=[$4]), rowType=[RecordType(DECIMAL(38, 18) e)]
+ +- LogicalTableScan(table=[[T2, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInToSemiJoinFloatEqualsDouble">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM T1 WHERE T1.c IN (SELECT d FROM T2)]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
++- LogicalFilter(condition=[IN($2, {
+LogicalProject(d=[$3])
+ LogicalTableScan(table=[[T2, source: [TestTableSource(a, b, c, d, e)]]])
+})]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+ +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
++- LogicalJoin(condition=[=($2, $6)], joinType=[semi]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+ :- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+ +- LogicalProject(d=[$0], d0=[CAST($0):FLOAT]), rowType=[RecordType(DOUBLE d, FLOAT d0)]
+ +- LogicalProject(d=[$3]), rowType=[RecordType(DOUBLE d)]
+ +- LogicalTableScan(table=[[T2, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInToSemiJoinIntEqualsDouble">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM T1 WHERE T1.a IN (SELECT d FROM T2)]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(d=[$3])
+ LogicalTableScan(table=[[T2, source: [TestTableSource(a, b, c, d, e)]]])
+})]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+ +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
++- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+ +- LogicalJoin(condition=[=($5, $6)], joinType=[semi]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e, DOUBLE a0)]
+ :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], a0=[CAST($0):DOUBLE]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e, DOUBLE a0)]
+ : +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+ +- LogicalProject(d=[$3]), rowType=[RecordType(DOUBLE d)]
+ +- LogicalTableScan(table=[[T2, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInToSemiJoinIntEqualsFloat">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM T1 WHERE T1.a IN (SELECT c FROM T2)]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(c=[$2])
+ LogicalTableScan(table=[[T2, source: [TestTableSource(a, b, c, d, e)]]])
+})]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+ +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
++- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+ +- LogicalJoin(condition=[=($5, $6)], joinType=[semi]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e, FLOAT a0)]
+ :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], a0=[CAST($0):FLOAT]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e, FLOAT a0)]
+ : +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+ +- LogicalProject(c=[$2]), rowType=[RecordType(FLOAT c)]
+ +- LogicalTableScan(table=[[T2, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInToSemiJoinIntEqualsLong">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM T1 WHERE T1.a IN (SELECT b FROM T2)]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(b=[$1])
+ LogicalTableScan(table=[[T2, source: [TestTableSource(a, b, c, d, e)]]])
+})]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+ +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
++- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+ +- LogicalJoin(condition=[=($5, $6)], joinType=[semi]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e, BIGINT a0)]
+ :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], a0=[CAST($0):BIGINT]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e, BIGINT a0)]
+ : +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+ +- LogicalProject(b=[$1]), rowType=[RecordType(BIGINT b)]
+ +- LogicalTableScan(table=[[T2, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testJoinConditionEqualsTypesNotEquals01">
+ <Resource name="sql">
+ <![CDATA[SELECT a FROM T1 LEFT JOIN (SELECT COUNT(*) AS cnt FROM T2) AS x ON a = x.cnt]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0]), rowType=[RecordType(INTEGER a)]
++- LogicalJoin(condition=[=($5, $6)], joinType=[left]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e, BIGINT a0, BIGINT cnt)]
+ :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], a0=[CAST($0):BIGINT]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e, BIGINT a0)]
+ : +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+ +- LogicalAggregate(group=[{}], cnt=[COUNT()]), rowType=[RecordType(BIGINT cnt)]
+ +- LogicalProject($f0=[0]), rowType=[RecordType(INTEGER $f0)]
+ +- LogicalTableScan(table=[[T2, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0]), rowType=[RecordType(INTEGER a)]
++- LogicalJoin(condition=[=($5, $6)], joinType=[left]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e, BIGINT a0, BIGINT cnt)]
+ :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], a0=[CAST($0):BIGINT]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e, BIGINT a0)]
+ : +- LogicalTableScan(table=[[T1, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+ +- LogicalAggregate(group=[{}], cnt=[COUNT()]), rowType=[RecordType(BIGINT cnt)]
+ +- LogicalProject($f0=[0]), rowType=[RecordType(INTEGER $f0)]
+ +- LogicalTableScan(table=[[T2, source: [TestTableSource(a, b, c, d, e)]]]), rowType=[RecordType(INTEGER a, BIGINT b, FLOAT c, DOUBLE d, DECIMAL(38, 18) e)]
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testJoinConditionEqualsTypesNotEquals02">
+ <Resource name="sql">
+ <![CDATA[
+-- TC 01.04
+SELECT t3a,
+ t3b
+FROM T3
+WHERE t3c IN (SELECT t4b
+ FROM T4
+ WHERE t3a = t4a
+ OR t3b > t4b)
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(t3a=[$0], t3b=[$1]), rowType=[RecordType(VARCHAR(65536) t3a, SMALLINT t3b)]
++- LogicalFilter(condition=[IN($2, {
+LogicalProject(t4b=[$1])
+ LogicalFilter(condition=[OR(=($cor0.t3a, $0), >($cor0.t3b, $1))])
+ LogicalTableScan(table=[[T4, source: [TestTableSource(t4a, t4b, t4c)]]])
+})], variablesSet=[[$cor0]]), rowType=[RecordType(VARCHAR(65536) t3a, SMALLINT t3b, INTEGER t3c)]
+ +- LogicalTableScan(table=[[T3, source: [TestTableSource(t3a, t3b, t3c)]]]), rowType=[RecordType(VARCHAR(65536) t3a, SMALLINT t3b, INTEGER t3c)]
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(t3a=[$0], t3b=[$1]), rowType=[RecordType(VARCHAR(65536) t3a, SMALLINT t3b)]
++- LogicalJoin(condition=[AND(=($0, $4), =($1, $5), =($2, CAST($3):INTEGER))], joinType=[inner]), rowType=[RecordType(VARCHAR(65536) t3a, SMALLINT t3b, INTEGER t3c, SMALLINT t4b, VARCHAR(65536) t3a0, SMALLINT t3b0)]
+ :- LogicalTableScan(table=[[T3, source: [TestTableSource(t3a, t3b, t3c)]]]), rowType=[RecordType(VARCHAR(65536) t3a, SMALLINT t3b, INTEGER t3c)]
+ +- LogicalAggregate(group=[{0, 1, 2}]), rowType=[RecordType(SMALLINT t4b, VARCHAR(65536) t3a, SMALLINT t3b)]
+ +- LogicalProject(t4b=[$1], t3a=[$3], t3b=[$4]), rowType=[RecordType(SMALLINT t4b, VARCHAR(65536) t3a, SMALLINT t3b)]
+ +- LogicalJoin(condition=[OR(=($3, $0), >($4, $1))], joinType=[inner]), rowType=[RecordType(VARCHAR(65536) t4a, SMALLINT t4b, INTEGER t4c, VARCHAR(65536) t3a, SMALLINT t3b)]
+ :- LogicalTableScan(table=[[T4, source: [TestTableSource(t4a, t4b, t4c)]]]), rowType=[RecordType(VARCHAR(65536) t4a, SMALLINT t4b, INTEGER t4c)]
+ +- LogicalAggregate(group=[{0, 1}]), rowType=[RecordType(VARCHAR(65536) t3a, SMALLINT t3b)]
+ +- LogicalProject(t3a=[$0], t3b=[$1]), rowType=[RecordType(VARCHAR(65536) t3a, SMALLINT t3b)]
+ +- LogicalTableScan(table=[[T3, source: [TestTableSource(t3a, t3b, t3c)]]]), rowType=[RecordType(VARCHAR(65536) t3a, SMALLINT t3b, INTEGER t3c)]
+]]>
+ </Resource>
+ </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/JoinDependentConditionDerivationRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/JoinDependentConditionDerivationRuleTest.xml
new file mode 100644
index 0000000..4ff1748
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/JoinDependentConditionDerivationRuleTest.xml
@@ -0,0 +1,208 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testThreeOr">
+ <Resource name="sql">
+ <![CDATA[SELECT a, d FROM MyTable1, MyTable2 WHERE (b = e AND a = 0) OR ((a = 1 AND d = 2) OR (a = 2 AND d = 1))]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], d=[$3])
++- LogicalFilter(condition=[OR(AND(=($1, $4), =($0, 0)), AND(=($0, 1), =($3, 2)), AND(=($0, 2), =($3, 1)))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], d=[$3])
++- LogicalJoin(condition=[AND(OR(AND(=($1, $4), =($0, 0)), AND(=($0, 1), =($3, 2)), AND(=($0, 2), =($3, 1))), OR(=($0, 0), =($0, 1), =($0, 2)))], joinType=[inner])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testAnd">
+ <Resource name="sql">
+ <![CDATA[SELECT a, d FROM MyTable1, MyTable2 WHERE b = e AND ((a = 1 AND d = 2) OR (a = 2 AND d = 1))]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], d=[$3])
++- LogicalFilter(condition=[AND(=($1, $4), OR(AND(=($0, 1), =($3, 2)), AND(=($0, 2), =($3, 1))))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], d=[$3])
++- LogicalJoin(condition=[AND(=($1, $4), OR(AND(=($0, 1), =($3, 2)), AND(=($0, 2), =($3, 1))), OR(=($0, 1), =($0, 2)), OR(=($3, 2), =($3, 1)))], joinType=[inner])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCanNotMatchThisRule">
+ <Resource name="sql">
+ <![CDATA[SELECT a, d FROM MyTable1, MyTable2 WHERE b = e OR ((a = 1 AND d = 2) OR (a = 2 AND d = 1))]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], d=[$3])
++- LogicalFilter(condition=[OR(=($1, $4), AND(=($0, 1), =($3, 2)), AND(=($0, 2), =($3, 1)))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], d=[$3])
++- LogicalJoin(condition=[OR(=($1, $4), AND(=($0, 1), =($3, 2)), AND(=($0, 2), =($3, 1)))], joinType=[inner])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMultiFields">
+ <Resource name="sql">
+ <![CDATA[SELECT a, d FROM MyTable1, MyTable2 WHERE (a = 1 AND b = 1 AND d = 2 AND e = 2) OR (a = 2 AND b = 2 AND d = 1 AND e = 1)]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], d=[$3])
++- LogicalFilter(condition=[OR(AND(=($0, 1), =($1, 1), =($3, 2), =($4, 2)), AND(=($0, 2), =($1, 2), =($3, 1), =($4, 1)))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], d=[$3])
++- LogicalJoin(condition=[AND(OR(AND(=($0, 1), =($1, 1), =($3, 2), =($4, 2)), AND(=($0, 2), =($1, 2), =($3, 1), =($4, 1))), OR(AND(=($0, 1), =($1, 1)), AND(=($0, 2), =($1, 2))), OR(AND(=($3, 2), =($4, 2)), AND(=($3, 1), =($4, 1))))], joinType=[inner])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testAndOr">
+ <Resource name="sql">
+ <![CDATA[SELECT a, d FROM MyTable1, MyTable2 WHERE ((a = 1 AND d = 2) OR (a = 2 AND d = 1)) AND ((a = 3 AND d = 4) OR (a = 4 AND d = 3))]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], d=[$3])
++- LogicalFilter(condition=[AND(OR(AND(=($0, 1), =($3, 2)), AND(=($0, 2), =($3, 1))), OR(AND(=($0, 3), =($3, 4)), AND(=($0, 4), =($3, 3))))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], d=[$3])
++- LogicalJoin(condition=[AND(OR(AND(=($0, 1), =($3, 2)), AND(=($0, 2), =($3, 1))), OR(AND(=($0, 3), =($3, 4)), AND(=($0, 4), =($3, 3))), OR(=($0, 1), =($0, 2)), OR(=($3, 2), =($3, 1)), OR(=($0, 3), =($0, 4)), OR(=($3, 4), =($3, 3)))], joinType=[inner])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMultiJoins">
+ <Resource name="sql">
+ <![CDATA[
+SELECT T1.a, T2.d FROM MyTable1 T1,
+ (SELECT * FROM MyTable1, MyTable2 WHERE a = d) T2 WHERE
+(T1.a = 1 AND T1.b = 1 AND T2.a = 2 AND T2.e = 2)
+OR
+(T1.a = 2 AND T2.b = 2 AND T2.d = 1 AND T2.e = 1)
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], d=[$6])
++- LogicalFilter(condition=[OR(AND(=($0, 1), =($1, 1), =($3, 2), =($7, 2)), AND(=($0, 2), =($4, 2), =($6, 1), =($7, 1)))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5], g=[$6], h=[$7])
+ +- LogicalFilter(condition=[=($0, $3)])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], d=[$6])
++- LogicalJoin(condition=[AND(OR(AND(=($0, 1), =($1, 1), =($3, 2), =($7, 2)), AND(=($0, 2), =($4, 2), =($6, 1), =($7, 1))), OR(AND(=($0, 1), =($1, 1)), =($0, 2)), OR(AND(=($3, 2), =($7, 2)), AND(=($4, 2), =($6, 1), =($7, 1))))], joinType=[inner])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5], g=[$6], h=[$7])
+ +- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMultiSingleSideFields">
+ <Resource name="sql">
+ <![CDATA[SELECT a, d FROM MyTable1, MyTable2 WHERE (a = 1 AND b = 1 AND d = 2 AND e = 2) OR (d = 1 AND e = 1)]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], d=[$3])
++- LogicalFilter(condition=[OR(AND(=($0, 1), =($1, 1), =($3, 2), =($4, 2)), AND(=($3, 1), =($4, 1)))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], d=[$3])
++- LogicalJoin(condition=[AND(OR(AND(=($0, 1), =($1, 1), =($3, 2), =($4, 2)), AND(=($3, 1), =($4, 1))), OR(AND(=($3, 2), =($4, 2)), AND(=($3, 1), =($4, 1))))], joinType=[inner])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSimple">
+ <Resource name="sql">
+ <![CDATA[SELECT a, d FROM MyTable1, MyTable2 WHERE (a = 1 AND d = 2) OR (a = 2 AND d = 1)]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], d=[$3])
++- LogicalFilter(condition=[OR(AND(=($0, 1), =($3, 2)), AND(=($0, 2), =($3, 1)))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], d=[$3])
++- LogicalJoin(condition=[AND(OR(AND(=($0, 1), =($3, 2)), AND(=($0, 2), =($3, 1))), OR(=($0, 1), =($0, 2)), OR(=($3, 2), =($3, 1)))], joinType=[inner])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f, g, h)]]])
+]]>
+ </Resource>
+ </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/JoinDeriveNullFilterRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/JoinDeriveNullFilterRuleTest.xml
new file mode 100644
index 0000000..58233bf
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/JoinDeriveNullFilterRuleTest.xml
@@ -0,0 +1,212 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testFullJoin">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable1 FULL JOIN MyTable2 ON c1 = c2]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], d1=[$3], e1=[$4], a2=[$5], b2=[$6], c2=[$7], d2=[$8], e2=[$9])
++- LogicalJoin(condition=[=($2, $7)], joinType=[full])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a1, b1, c1, d1, e1)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(a2, b2, c2, d2, e2)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], d1=[$3], e1=[$4], a2=[$5], b2=[$6], c2=[$7], d2=[$8], e2=[$9])
++- LogicalJoin(condition=[=($2, $7)], joinType=[full])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a1, b1, c1, d1, e1)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(a2, b2, c2, d2, e2)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInnerJoin_NoneEquiJoinKeys">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable1 JOIN MyTable2 ON a1 > a2]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], d1=[$3], e1=[$4], a2=[$5], b2=[$6], c2=[$7], d2=[$8], e2=[$9])
++- LogicalJoin(condition=[>($0, $5)], joinType=[inner])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a1, b1, c1, d1, e1)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(a2, b2, c2, d2, e2)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], d1=[$3], e1=[$4], a2=[$5], b2=[$6], c2=[$7], d2=[$8], e2=[$9])
++- LogicalJoin(condition=[>($0, $5)], joinType=[inner])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a1, b1, c1, d1, e1)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(a2, b2, c2, d2, e2)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInnerJoin_NoNullCount">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable1 JOIN MyTable2 ON d1 = d2]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], d1=[$3], e1=[$4], a2=[$5], b2=[$6], c2=[$7], d2=[$8], e2=[$9])
++- LogicalJoin(condition=[=($3, $8)], joinType=[inner])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a1, b1, c1, d1, e1)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(a2, b2, c2, d2, e2)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], d1=[$3], e1=[$4], a2=[$5], b2=[$6], c2=[$7], d2=[$8], e2=[$9])
++- LogicalJoin(condition=[=($3, $8)], joinType=[inner])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a1, b1, c1, d1, e1)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(a2, b2, c2, d2, e2)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInnerJoin_NullCountLessThanThreshold">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable1 JOIN MyTable2 ON e1 = e2]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], d1=[$3], e1=[$4], a2=[$5], b2=[$6], c2=[$7], d2=[$8], e2=[$9])
++- LogicalJoin(condition=[=($4, $9)], joinType=[inner])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a1, b1, c1, d1, e1)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(a2, b2, c2, d2, e2)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], d1=[$3], e1=[$4], a2=[$5], b2=[$6], c2=[$7], d2=[$8], e2=[$9])
++- LogicalJoin(condition=[=($4, $9)], joinType=[inner])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a1, b1, c1, d1, e1)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(a2, b2, c2, d2, e2)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInnerJoin_NullCountOnLeftJoinKeys">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable1 JOIN MyTable2 ON a1 = a2]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], d1=[$3], e1=[$4], a2=[$5], b2=[$6], c2=[$7], d2=[$8], e2=[$9])
++- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a1, b1, c1, d1, e1)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(a2, b2, c2, d2, e2)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], d1=[$3], e1=[$4], a2=[$5], b2=[$6], c2=[$7], d2=[$8], e2=[$9])
++- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+ :- LogicalFilter(condition=[IS NOT NULL($0)])
+ : +- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a1, b1, c1, d1, e1)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(a2, b2, c2, d2, e2)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInnerJoin_NullCountOnLeftRightJoinKeys">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable1 JOIN MyTable2 ON c1 = c2]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], d1=[$3], e1=[$4], a2=[$5], b2=[$6], c2=[$7], d2=[$8], e2=[$9])
++- LogicalJoin(condition=[=($2, $7)], joinType=[inner])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a1, b1, c1, d1, e1)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(a2, b2, c2, d2, e2)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], d1=[$3], e1=[$4], a2=[$5], b2=[$6], c2=[$7], d2=[$8], e2=[$9])
++- LogicalJoin(condition=[=($2, $7)], joinType=[inner])
+ :- LogicalFilter(condition=[IS NOT NULL($2)])
+ : +- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a1, b1, c1, d1, e1)]]])
+ +- LogicalFilter(condition=[IS NOT NULL($2)])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(a2, b2, c2, d2, e2)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLeftJoin">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON c1 = c2]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], d1=[$3], e1=[$4], a2=[$5], b2=[$6], c2=[$7], d2=[$8], e2=[$9])
++- LogicalJoin(condition=[=($2, $7)], joinType=[left])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a1, b1, c1, d1, e1)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(a2, b2, c2, d2, e2)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], d1=[$3], e1=[$4], a2=[$5], b2=[$6], c2=[$7], d2=[$8], e2=[$9])
++- LogicalJoin(condition=[=($2, $7)], joinType=[left])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a1, b1, c1, d1, e1)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(a2, b2, c2, d2, e2)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInnerJoin_NullCountOnRightJoinKeys">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable1 JOIN MyTable2 ON b1 = b2]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], d1=[$3], e1=[$4], a2=[$5], b2=[$6], c2=[$7], d2=[$8], e2=[$9])
++- LogicalJoin(condition=[=($1, $6)], joinType=[inner])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a1, b1, c1, d1, e1)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(a2, b2, c2, d2, e2)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], d1=[$3], e1=[$4], a2=[$5], b2=[$6], c2=[$7], d2=[$8], e2=[$9])
++- LogicalJoin(condition=[=($1, $6)], joinType=[inner])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a1, b1, c1, d1, e1)]]])
+ +- LogicalFilter(condition=[IS NOT NULL($1)])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(a2, b2, c2, d2, e2)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testRightJoin">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable1 RIGHT JOIN MyTable2 ON c1 = c2]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], d1=[$3], e1=[$4], a2=[$5], b2=[$6], c2=[$7], d2=[$8], e2=[$9])
++- LogicalJoin(condition=[=($2, $7)], joinType=[right])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a1, b1, c1, d1, e1)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(a2, b2, c2, d2, e2)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], c1=[$2], d1=[$3], e1=[$4], a2=[$5], b2=[$6], c2=[$7], d2=[$8], e2=[$9])
++- LogicalJoin(condition=[=($2, $7)], joinType=[right])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a1, b1, c1, d1, e1)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(a2, b2, c2, d2, e2)]]])
+]]>
+ </Resource>
+ </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/SimplifyJoinConditionRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/SimplifyJoinConditionRuleTest.xml
new file mode 100644
index 0000000..35d22ba
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/SimplifyJoinConditionRuleTest.xml
@@ -0,0 +1,75 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testSimplifyJoinConditionFromSubQuery">
+ <Resource name="sql">
+ <![CDATA[
+SELECT a FROM MyTable1 WHERE b = (
+ SELECT COUNT(*) FROM MyTable2 WHERE (d = a AND d < 2) OR (d = a AND b = 5))
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0])
++- LogicalFilter(condition=[=($1, $SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+ LogicalProject($f0=[0])
+ LogicalFilter(condition=[OR(AND(=($0, $cor0.a), <($0, 2)), AND(=($0, $cor0.a), =($cor0.b, 5)))])
+ LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+}))], variablesSet=[[$cor0]])
+ +- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0])
++- LogicalFilter(condition=[=($1, CASE(IS NULL($5), 0:BIGINT, $5))])
+ +- LogicalJoin(condition=[AND(=($0, $3), =($1, $4))], joinType=[left])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalAggregate(group=[{0, 1}], EXPR$0=[COUNT()])
+ +- LogicalProject(a=[$3], b=[$4], $f0=[0])
+ +- LogicalJoin(condition=[AND(=($0, $3), OR(<($0, 2), =($4, 5)))], joinType=[inner])
+ :- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+ +- LogicalAggregate(group=[{0, 1}])
+ +- LogicalProject(a=[$0], b=[$1])
+ +- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSimplifyJoinCondition">
+ <Resource name="sql">
+ <![CDATA[SELECT d FROM MyTable1 JOIN MyTable2 ON (d = a AND a > 2) OR (d = a AND b = 1)]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(d=[$3])
++- LogicalJoin(condition=[OR(AND(=($3, $0), >($0, 2)), AND(=($3, $0), =($1, 1)))], joinType=[inner])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(d=[$3])
++- LogicalJoin(condition=[AND(=($3, $0), OR(>($0, 2), =($1, 1)))], joinType=[inner])
+ :- LogicalTableScan(table=[[MyTable1, source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[MyTable2, source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/subquery/SubQuerySemiJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/subquery/SubQuerySemiJoinTest.xml
index ac4f50d..bffdd28 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/subquery/SubQuerySemiJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/subquery/SubQuerySemiJoinTest.xml
@@ -1794,13 +1794,15 @@ LogicalAggregate(group=[{}], EXPR$0=[AVG($0)])
<Resource name="planAfter">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2])
-+- LogicalJoin(condition=[AND(=($1, $3), =($2, $4))], joinType=[semi])
- :- LogicalTableScan(table=[[l, source: [TestTableSource(a, b, c)]]])
- +- LogicalProject(EXPR$0=[$1], f=[$0])
- +- LogicalAggregate(group=[{0}], EXPR$0=[AVG($1)])
- +- LogicalProject(f=[$2], e=[$1])
- +- LogicalFilter(condition=[true])
- +- LogicalTableScan(table=[[r, source: [TestTableSource(d, e, f)]]])
++- LogicalProject(a=[$0], b=[$1], c=[$2])
+ +- LogicalJoin(condition=[AND(=($3, $4), =($2, $5))], joinType=[semi])
+ :- LogicalProject(a=[$0], b=[$1], c=[$2], b0=[CAST($1):DOUBLE])
+ : +- LogicalTableScan(table=[[l, source: [TestTableSource(a, b, c)]]])
+ +- LogicalProject(EXPR$0=[$1], f=[$0])
+ +- LogicalAggregate(group=[{0}], EXPR$0=[AVG($1)])
+ +- LogicalProject(f=[$2], e=[$1])
+ +- LogicalFilter(condition=[true])
+ +- LogicalTableScan(table=[[r, source: [TestTableSource(d, e, f)]]])
]]>
</Resource>
</TestCase>
@@ -1825,14 +1827,15 @@ LogicalProject(d=[$0])
<Resource name="planAfter">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2])
-+- LogicalJoin(condition=[AND(=($1, $3), >($2, $4))], joinType=[semi])
++- LogicalJoin(condition=[AND(=($1, $5), >($2, $4))], joinType=[semi])
:- LogicalTableScan(table=[[l, source: [TestTableSource(a, b, c)]]])
- +- LogicalProject(d=[$0], f=[$1])
- +- LogicalFilter(condition=[true])
- +- LogicalProject(d=[$1], f=[$0])
- +- LogicalAggregate(group=[{0}], d=[MAX($1)])
- +- LogicalProject(f=[$2], d=[$0])
- +- LogicalTableScan(table=[[r, source: [TestTableSource(d, e, f)]]])
+ +- LogicalProject(d=[$0], f=[$1], d0=[CAST($0):BIGINT])
+ +- LogicalProject(d=[$0], f=[$1])
+ +- LogicalFilter(condition=[true])
+ +- LogicalProject(d=[$1], f=[$0])
+ +- LogicalAggregate(group=[{0}], d=[MAX($1)])
+ +- LogicalProject(f=[$2], d=[$0])
+ +- LogicalTableScan(table=[[r, source: [TestTableSource(d, e, f)]]])
]]>
</Resource>
</TestCase>
@@ -2175,11 +2178,13 @@ LogicalProject(d=[$1])
<Resource name="planAfter">
<![CDATA[
LogicalProject(a=[$0], b=[$1])
-+- LogicalJoin(condition=[AND(=($0, $2), =($2, $1))], joinType=[semi])
- :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b)]]])
- +- LogicalProject(d=[$1])
- +- LogicalFilter(condition=[true])
- +- LogicalTableScan(table=[[y, source: [TestTableSource(c, d)]]])
++- LogicalProject(a=[$0], b=[$1])
+ +- LogicalJoin(condition=[AND(=($2, $3), =($3, $1))], joinType=[semi])
+ :- LogicalProject(a=[$0], b=[$1], a0=[CAST($0):BIGINT])
+ : +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b)]]])
+ +- LogicalProject(d=[$1])
+ +- LogicalFilter(condition=[true])
+ +- LogicalTableScan(table=[[y, source: [TestTableSource(c, d)]]])
]]>
</Resource>
</TestCase>
@@ -2204,14 +2209,15 @@ LogicalProject(d=[$0])
<Resource name="planAfter">
<![CDATA[
LogicalProject(c=[$2])
-+- LogicalJoin(condition=[AND(=($1, $3), =($2, $4))], joinType=[semi])
++- LogicalJoin(condition=[AND(=($1, $5), =($2, $4))], joinType=[semi])
:- LogicalTableScan(table=[[l, source: [TestTableSource(a, b, c)]]])
- +- LogicalProject(d=[$0], k=[$4])
- +- LogicalJoin(condition=[=($1, $3)], joinType=[inner])
- :- LogicalTableScan(table=[[r, source: [TestTableSource(d, e, f)]]])
- +- LogicalProject(j=[$1], k=[$2])
- +- LogicalFilter(condition=[>($0, 10)])
- +- LogicalTableScan(table=[[t, source: [TestTableSource(i, j, k)]]])
+ +- LogicalProject(d=[$0], k=[$1], d0=[CAST($0):BIGINT])
+ +- LogicalProject(d=[$0], k=[$4])
+ +- LogicalJoin(condition=[=($1, $3)], joinType=[inner])
+ :- LogicalTableScan(table=[[r, source: [TestTableSource(d, e, f)]]])
+ +- LogicalProject(j=[$1], k=[$2])
+ +- LogicalFilter(condition=[>($0, 10)])
+ +- LogicalTableScan(table=[[t, source: [TestTableSource(i, j, k)]]])
]]>
</Resource>
</TestCase>
@@ -2498,14 +2504,15 @@ LogicalProject(d=[$0])
<Resource name="planAfter">
<![CDATA[
LogicalProject(c=[$2])
-+- LogicalJoin(condition=[AND(=($1, $3), =($2, $4))], joinType=[semi])
++- LogicalJoin(condition=[AND(=($1, $5), =($2, $4))], joinType=[semi])
:- LogicalTableScan(table=[[l, source: [TestTableSource(a, b, c)]]])
- +- LogicalProject(d=[$0], k=[$4])
- +- LogicalJoin(condition=[=($1, $3)], joinType=[right])
- :- LogicalTableScan(table=[[r, source: [TestTableSource(d, e, f)]]])
- +- LogicalProject(j=[$1], k=[$2])
- +- LogicalFilter(condition=[>($0, 10)])
- +- LogicalTableScan(table=[[t, source: [TestTableSource(i, j, k)]]])
+ +- LogicalProject(d=[$0], k=[$1], d0=[CAST($0):BIGINT])
+ +- LogicalProject(d=[$0], k=[$4])
+ +- LogicalJoin(condition=[=($1, $3)], joinType=[right])
+ :- LogicalTableScan(table=[[r, source: [TestTableSource(d, e, f)]]])
+ +- LogicalProject(j=[$1], k=[$2])
+ +- LogicalFilter(condition=[>($0, 10)])
+ +- LogicalTableScan(table=[[t, source: [TestTableSource(i, j, k)]]])
]]>
</Resource>
</TestCase>
@@ -3503,27 +3510,29 @@ LogicalProject(d=[$0], e=[$1])
<![CDATA[
LogicalProject(c=[$2])
+- LogicalProject(a=[$0], b=[$1], c=[$2])
- +- LogicalJoin(condition=[AND(=($3, $5), =($4, $6))], joinType=[semi])
- :- LogicalProject(a=[$0], b=[$1], c=[$2], $f3=[CASE(AND(<>($3, 0), IS NOT NULL($6), IS NOT NULL($0)), 1, 2)], $f4=[CASE(AND(<>($7, 0), IS NOT NULL($10), IS NOT NULL($1)), 3, 4)])
- : +- LogicalJoin(condition=[=($1, $9)], joinType=[left])
- : :- LogicalJoin(condition=[true], joinType=[inner])
- : : :- LogicalJoin(condition=[=($0, $5)], joinType=[left])
- : : : :- LogicalJoin(condition=[true], joinType=[inner])
- : : : : :- LogicalTableScan(table=[[l, source: [TestTableSource(a, b, c)]]])
- : : : : +- LogicalAggregate(group=[{}], c=[COUNT()], ck=[COUNT($0)])
- : : : : +- LogicalProject(i=[$0])
- : : : : +- LogicalTableScan(table=[[t1, source: [TestTableSource(i)]]])
- : : : +- LogicalAggregate(group=[{0, 1}])
- : : : +- LogicalProject(i=[$0], i0=[true])
- : : : +- LogicalTableScan(table=[[t1, source: [TestTableSource(i)]]])
- : : +- LogicalAggregate(group=[{}], c=[COUNT()], ck=[COUNT($0)])
- : : +- LogicalProject(j=[$0])
- : : +- LogicalTableScan(table=[[t2, source: [TestTableSource(j)]]])
- : +- LogicalAggregate(group=[{0, 1}])
- : +- LogicalProject(j=[$0], i=[true])
- : +- LogicalTableScan(table=[[t2, source: [TestTableSource(j)]]])
- +- LogicalProject(d=[$0], e=[$1])
- +- LogicalTableScan(table=[[r, source: [TestTableSource(d, e, f)]]])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], $f3=[$3], $f4=[$4])
+ +- LogicalJoin(condition=[AND(=($3, $6), =($5, $7))], joinType=[semi])
+ :- LogicalProject(a=[$0], b=[$1], c=[$2], $f3=[$3], $f4=[$4], $f40=[CAST($4):BIGINT NOT NULL])
+ : +- LogicalProject(a=[$0], b=[$1], c=[$2], $f3=[CASE(AND(<>($3, 0), IS NOT NULL($6), IS NOT NULL($0)), 1, 2)], $f4=[CASE(AND(<>($7, 0), IS NOT NULL($10), IS NOT NULL($1)), 3, 4)])
+ : +- LogicalJoin(condition=[=($1, $9)], joinType=[left])
+ : :- LogicalJoin(condition=[true], joinType=[inner])
+ : : :- LogicalJoin(condition=[=($0, $5)], joinType=[left])
+ : : : :- LogicalJoin(condition=[true], joinType=[inner])
+ : : : : :- LogicalTableScan(table=[[l, source: [TestTableSource(a, b, c)]]])
+ : : : : +- LogicalAggregate(group=[{}], c=[COUNT()], ck=[COUNT($0)])
+ : : : : +- LogicalProject(i=[$0])
+ : : : : +- LogicalTableScan(table=[[t1, source: [TestTableSource(i)]]])
+ : : : +- LogicalAggregate(group=[{0, 1}])
+ : : : +- LogicalProject(i=[$0], i0=[true])
+ : : : +- LogicalTableScan(table=[[t1, source: [TestTableSource(i)]]])
+ : : +- LogicalAggregate(group=[{}], c=[COUNT()], ck=[COUNT($0)])
+ : : +- LogicalProject(j=[$0])
+ : : +- LogicalTableScan(table=[[t2, source: [TestTableSource(j)]]])
+ : +- LogicalAggregate(group=[{0, 1}])
+ : +- LogicalProject(j=[$0], i=[true])
+ : +- LogicalTableScan(table=[[t2, source: [TestTableSource(j)]]])
+ +- LogicalProject(d=[$0], e=[$1])
+ +- LogicalTableScan(table=[[r, source: [TestTableSource(d, e, f)]]])
]]>
</Resource>
</TestCase>
@@ -3573,14 +3582,15 @@ LogicalProject(d=[$0])
<Resource name="planAfter">
<![CDATA[
LogicalProject(c=[$2])
-+- LogicalJoin(condition=[=($1, $3)], joinType=[semi])
++- LogicalJoin(condition=[=($1, $4)], joinType=[semi])
:- LogicalTableScan(table=[[l, source: [TestTableSource(a, b, c)]]])
- +- LogicalProject(d=[$0])
- +- LogicalJoin(condition=[=($1, $3)], joinType=[full])
- :- LogicalTableScan(table=[[r, source: [TestTableSource(d, e, f)]]])
- +- LogicalProject(j=[$1])
- +- LogicalFilter(condition=[>($0, 10)])
- +- LogicalTableScan(table=[[t, source: [TestTableSource(i, j, k)]]])
+ +- LogicalProject(d=[$0], d0=[CAST($0):BIGINT])
+ +- LogicalProject(d=[$0])
+ +- LogicalJoin(condition=[=($1, $3)], joinType=[full])
+ :- LogicalTableScan(table=[[r, source: [TestTableSource(d, e, f)]]])
+ +- LogicalProject(j=[$1])
+ +- LogicalFilter(condition=[>($0, 10)])
+ +- LogicalTableScan(table=[[t, source: [TestTableSource(i, j, k)]]])
]]>
</Resource>
</TestCase>
@@ -3665,10 +3675,12 @@ LogicalProject(d=[$1])
<Resource name="planAfter">
<![CDATA[
LogicalProject(a=[$0], b=[$1])
-+- LogicalJoin(condition=[=($0, $2)], joinType=[semi])
- :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b)]]])
- +- LogicalProject(d=[$1])
- +- LogicalTableScan(table=[[y, source: [TestTableSource(c, d)]]])
++- LogicalProject(a=[$0], b=[$1])
+ +- LogicalJoin(condition=[=($2, $3)], joinType=[semi])
+ :- LogicalProject(a=[$0], b=[$1], a0=[CAST($0):BIGINT])
+ : +- LogicalTableScan(table=[[x, source: [TestTableSource(a, b)]]])
+ +- LogicalProject(d=[$1])
+ +- LogicalTableScan(table=[[y, source: [TestTableSource(c, d)]]])
]]>
</Resource>
</TestCase>
@@ -4385,14 +4397,15 @@ LogicalProject(e=[$0])
<Resource name="planAfter">
<![CDATA[
LogicalProject(a=[$0], b=[$1])
-+- LogicalJoin(condition=[AND(=($1, $2), =($3, $1))], joinType=[semi])
++- LogicalJoin(condition=[AND(=($1, $4), =($3, $1))], joinType=[semi])
:- LogicalJoin(condition=[=($0, $2)], joinType=[semi])
: :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b)]]])
: +- LogicalProject(c=[$0])
: +- LogicalTableScan(table=[[y, source: [TestTableSource(c, d)]]])
- +- LogicalProject(e=[$0], f=[$1])
- +- LogicalFilter(condition=[true])
- +- LogicalTableScan(table=[[z, source: [TestTableSource(e, f)]]])
+ +- LogicalProject(e=[$0], f=[$1], e0=[CAST($0):BIGINT])
+ +- LogicalProject(e=[$0], f=[$1])
+ +- LogicalFilter(condition=[true])
+ +- LogicalTableScan(table=[[z, source: [TestTableSource(e, f)]]])
]]>
</Resource>
</TestCase>
@@ -4480,13 +4493,14 @@ LogicalProject(e=[$0])
<Resource name="planAfter">
<![CDATA[
LogicalProject(a=[$0], b=[$1])
-+- LogicalJoin(condition=[=($1, $2)], joinType=[semi])
++- LogicalJoin(condition=[=($1, $3)], joinType=[semi])
:- LogicalJoin(condition=[=($0, $2)], joinType=[semi])
: :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b)]]])
: +- LogicalProject(c=[$0])
: +- LogicalTableScan(table=[[y, source: [TestTableSource(c, d)]]])
- +- LogicalProject(e=[$0])
- +- LogicalTableScan(table=[[z, source: [TestTableSource(e, f)]]])
+ +- LogicalProject(e=[$0], e0=[CAST($0):BIGINT])
+ +- LogicalProject(e=[$0])
+ +- LogicalTableScan(table=[[z, source: [TestTableSource(e, f)]]])
]]>
</Resource>
</TestCase>
@@ -4544,7 +4558,7 @@ LogicalProject(e=[$0])
<![CDATA[
LogicalProject(a=[$0], b=[$1])
+- LogicalProject(a=[$0], b=[$1])
- +- LogicalJoin(condition=[=($3, $4)], joinType=[semi])
+ +- LogicalJoin(condition=[=($3, $5)], joinType=[semi])
:- LogicalProject(a=[$0], b=[$1], $f2=[$2], $f3=[*($1, 2)])
: +- LogicalJoin(condition=[=($2, $3)], joinType=[semi])
: :- LogicalProject(a=[$0], b=[$1], $f2=[+($0, 1)])
@@ -4552,9 +4566,10 @@ LogicalProject(a=[$0], b=[$1])
: +- LogicalProject(c=[$0])
: +- LogicalFilter(condition=[>($1, 10)])
: +- LogicalTableScan(table=[[y, source: [TestTableSource(c, d)]]])
- +- LogicalProject(e=[$0])
- +- LogicalFilter(condition=[<($1, 10)])
- +- LogicalTableScan(table=[[z, source: [TestTableSource(e, f)]]])
+ +- LogicalProject(e=[$0], e0=[CAST($0):BIGINT])
+ +- LogicalProject(e=[$0])
+ +- LogicalFilter(condition=[<($1, 10)])
+ +- LogicalTableScan(table=[[z, source: [TestTableSource(e, f)]]])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/SemiAntiJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/SemiAntiJoinTest.xml
index 55657c9..bbc870f 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/SemiAntiJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/SemiAntiJoinTest.xml
@@ -2067,13 +2067,14 @@ LogicalFilter(condition=[<($cor0.a, 10)])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Join(joinType=[LeftAntiJoin], where=[true], select=[a, b, c])
-:- Exchange(distribution=[single])
-: +- Calc(select=[a, b, c], where=[<(a, 10)])
-: +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-+- Exchange(distribution=[single])
- +- Calc(select=[])
- +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+Calc(select=[a, b, c])
++- Join(joinType=[LeftAntiJoin], where=[$f3], select=[a, b, c, $f3])
+ :- Exchange(distribution=[single])
+ : +- Calc(select=[a, b, c, <(a, 10) AS $f3])
+ : +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- Exchange(distribution=[single])
+ +- Calc(select=[])
+ +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
]]>
</Resource>
</TestCase>
@@ -2380,13 +2381,14 @@ LogicalProject(d=[$0])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Join(joinType=[LeftAntiJoin], where=[OR(=(a, d), IS NULL(a), IS NULL(d))], select=[a, b, c])
-:- Exchange(distribution=[single])
-: +- Calc(select=[a, b, c], where=[>(b, 10)])
-: +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-+- Exchange(distribution=[single])
- +- Calc(select=[d])
- +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+Calc(select=[a, b, c])
++- Join(joinType=[LeftAntiJoin], where=[AND(OR(=(a, d), IS NULL(a), IS NULL(d)), $f3)], select=[a, b, c, $f3])
+ :- Exchange(distribution=[single])
+ : +- Calc(select=[a, b, c, >(b, 10) AS $f3])
+ : +- TableSourceScan(table=[[l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- Exchange(distribution=[single])
+ +- Calc(select=[d])
+ +- TableSourceScan(table=[[r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/WindowJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/WindowJoinTest.xml
index 4cd79fb..6cc4d22 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/WindowJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/WindowJoinTest.xml
@@ -35,12 +35,12 @@ LogicalProject(a=[$0], b=[$6])
<Resource name="planAfter">
<![CDATA[
Calc(select=[a, b])
-+- WindowJoin(joinType=[InnerJoin], windowBounds=[isRowTime=false, leftLowerBound=0, leftUpperBound=0, leftTimeIndex=1, rightTimeIndex=2], where=[AND(=(a, a0), =(PROCTIME_MATERIALIZE(proctime), PROCTIME_MATERIALIZE(proctime0)))], select=[a, proctime, a0, b, proctime0])
- :- Exchange(distribution=[hash[a]])
- : +- Calc(select=[a, proctime])
++- Join(joinType=[InnerJoin], where=[AND(=(a, a0), =($f5, $f50))], select=[a, $f5, a0, b, $f50])
+ :- Exchange(distribution=[hash[a, $f5]])
+ : +- Calc(select=[a, PROCTIME_MATERIALIZE(proctime) AS $f5])
: +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
- +- Exchange(distribution=[hash[a]])
- +- Calc(select=[a, b, proctime])
+ +- Exchange(distribution=[hash[a, $f5]])
+ +- Calc(select=[a, b, PROCTIME_MATERIALIZE(proctime) AS $f5])
+- DataStreamScan(table=[[_DataStreamTable_1]], fields=[a, b, c, proctime, rowtime])
]]>
</Resource>
@@ -64,12 +64,12 @@ LogicalProject(a=[$0], b=[$6])
<Resource name="planAfter">
<![CDATA[
Calc(select=[a, b])
-+- WindowJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=0, leftUpperBound=0, leftTimeIndex=1, rightTimeIndex=2], where=[AND(=(a, a0), =(CAST(rowtime), CAST(rowtime0)))], select=[a, rowtime, a0, b, rowtime0])
- :- Exchange(distribution=[hash[a]])
- : +- Calc(select=[a, rowtime])
++- Join(joinType=[InnerJoin], where=[AND(=(a, a0), =(rowtime0, rowtime00))], select=[a, rowtime0, a0, b, rowtime00])
+ :- Exchange(distribution=[hash[a, rowtime0]])
+ : +- Calc(select=[a, CAST(rowtime) AS rowtime0])
: +- DataStreamScan(table=[[_DataStreamTable_0]], fields=[a, b, c, proctime, rowtime])
- +- Exchange(distribution=[hash[a]])
- +- Calc(select=[a, b, rowtime])
+ +- Exchange(distribution=[hash[a, rowtime0]])
+ +- Calc(select=[a, b, CAST(rowtime) AS rowtime0])
+- DataStreamScan(table=[[_DataStreamTable_1]], fields=[a, b, c, proctime, rowtime])
]]>
</Resource>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/BroadcastHashJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/BroadcastHashJoinTest.scala
index 0bde6c9..fcfd2cb 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/BroadcastHashJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/BroadcastHashJoinTest.scala
@@ -34,13 +34,6 @@ class BroadcastHashJoinTest extends JoinTestBase {
}
@Test
- override def testJoinNonMatchingKeyTypes(): Unit = {
- thrown.expect(classOf[TableException])
- thrown.expectMessage("Equality join predicate on incompatible types")
- super.testJoinNonMatchingKeyTypes()
- }
-
- @Test
override def testInnerJoinWithoutJoinPred(): Unit = {
thrown.expect(classOf[TableException])
thrown.expectMessage("Cannot generate a valid execution plan for the given query")
@@ -125,6 +118,13 @@ class BroadcastHashJoinTest extends JoinTestBase {
}
@Test
+ override def testFullOuterWithUsing(): Unit = {
+ thrown.expect(classOf[TableException])
+ thrown.expectMessage("Cannot generate a valid execution plan for the given query")
+ super.testFullOuterWithUsing()
+ }
+
+ @Test
override def testFullOuterJoinOnTrue(): Unit = {
thrown.expect(classOf[TableException])
thrown.expectMessage("Cannot generate a valid execution plan for the given query")
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/JoinTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/JoinTestBase.scala
index 120501f..f7a96cb 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/JoinTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/JoinTestBase.scala
@@ -18,7 +18,7 @@
package org.apache.flink.table.plan.batch.sql.join
import org.apache.flink.api.scala._
-import org.apache.flink.table.api.{TableConfigOptions, ValidationException}
+import org.apache.flink.table.api.{TableException, ValidationException}
import org.apache.flink.table.util.{BatchTableTestUtil, TableTestBase}
import org.junit.Test
@@ -34,9 +34,9 @@ abstract class JoinTestBase extends TableTestBase {
util.verifyPlan("SELECT c, g FROM MyTable1, MyTable2 WHERE foo = e")
}
- @Test
+ @Test(expected = classOf[TableException])
def testJoinNonMatchingKeyTypes(): Unit = {
- // TODO do implicit type coercion
+ // INTEGER and VARCHAR(65536) does not have common type now
util.verifyPlan("SELECT c, g FROM MyTable1, MyTable2 WHERE a = g")
}
@@ -182,6 +182,16 @@ abstract class JoinTestBase extends TableTestBase {
}
@Test
+ def testFullOuterWithUsing(): Unit = {
+ util.addTableSource[(Int, Long, String)]("MyTable3", 'a, 'b, 'c)
+ val sqlQuery =
+ """
+ |SELECT * FROM (SELECT * FROM MyTable1) FULL JOIN (SELECT * FROM MyTable3) USING (a)
+ """.stripMargin
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
def testCrossJoin(): Unit = {
util.verifyPlan("SELECT * FROM MyTable2 CROSS JOIN MyTable1")
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala
index 94f015e..9aa2347 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala
@@ -23,8 +23,9 @@ import org.apache.flink.table.calcite.CalciteConfig
import org.apache.flink.table.plan.optimize.program.FlinkBatchProgram
import org.apache.flink.table.plan.stream.sql.join.TestTemporalTable
import org.apache.flink.table.util.TableTestBase
+
import org.junit.Assert.{assertTrue, fail}
-import org.junit.{Before, Ignore, Test}
+import org.junit.{Before, Test}
class LookupJoinTest extends TableTestBase {
private val testUtil = batchTestUtil()
@@ -121,6 +122,9 @@ class LookupJoinTest extends TableTestBase {
.replaceBatchProgram(programs).build()
testUtil.tableEnv.getConfig.setCalciteConfig(calciteConfig)
+ thrown.expect(classOf[TableException])
+ thrown.expectMessage("VARCHAR(65536) and INTEGER does not have common type now")
+
testUtil.verifyPlan("SELECT * FROM MyTable AS T JOIN temporalTest "
+ "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.b = D.id")
}
@@ -248,8 +252,7 @@ class LookupJoinTest extends TableTestBase {
private def expectExceptionThrown(
sql: String,
keywords: String,
- clazz: Class[_ <: Throwable] = classOf[ValidationException])
- : Unit = {
+ clazz: Class[_ <: Throwable] = classOf[ValidationException]): Unit = {
try {
testUtil.verifyExplain(sql)
fail(s"Expected a $clazz, but no exception is thrown.")
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/ShuffledHashJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/ShuffledHashJoinTest.scala
index 3199f93..a5b639c 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/ShuffledHashJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/ShuffledHashJoinTest.scala
@@ -32,13 +32,6 @@ class ShuffledHashJoinTest extends JoinTestBase {
}
@Test
- override def testJoinNonMatchingKeyTypes(): Unit = {
- thrown.expect(classOf[TableException])
- thrown.expectMessage("Equality join predicate on incompatible types")
- super.testJoinNonMatchingKeyTypes()
- }
-
- @Test
override def testInnerJoinWithoutJoinPred(): Unit = {
thrown.expect(classOf[TableException])
thrown.expectMessage("Cannot generate a valid execution plan for the given query")
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkFilterJoinRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkFilterJoinRuleTest.scala
new file mode 100644
index 0000000..3f08f1f
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkFilterJoinRuleTest.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.plan.optimize.program.{FlinkBatchProgram,
+ FlinkHepRuleSetProgramBuilder, HEP_RULES_EXECUTION_TYPE}
+import org.apache.flink.table.util.TableTestBase
+
+import org.apache.calcite.plan.hep.HepMatchOrder
+import org.apache.calcite.rel.rules.FilterProjectTransposeRule
+import org.apache.calcite.tools.RuleSets
+import org.junit.{Before, Test}
+
+/**
+ * Test for [[FlinkFilterJoinRule]].
+ */
+class FlinkFilterJoinRuleTest extends TableTestBase {
+ private val util = batchTestUtil()
+
+ @Before
+ def setup(): Unit = {
+ util.buildBatchProgram(FlinkBatchProgram.DEFAULT_REWRITE)
+ util.tableEnv.getConfig.getCalciteConfig.getBatchProgram.get.addLast(
+ "rules",
+ FlinkHepRuleSetProgramBuilder.newBuilder
+ .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
+ .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+ .add(RuleSets.ofList(
+ FilterProjectTransposeRule.INSTANCE,
+ FlinkFilterJoinRule.FILTER_ON_JOIN,
+ FlinkFilterJoinRule.JOIN))
+ .build()
+ )
+
+ util.addTableSource[(Int, Long)]("leftT", 'a, 'b)
+ util.addTableSource[(Int, Long)]("rightT", 'c, 'd)
+ }
+
+ @Test
+ def testFilterPushDownLeftSemi1(): Unit = {
+ val sqlQuery =
+ "SELECT * FROM (SELECT * FROM leftT WHERE a IN (SELECT c FROM rightT)) T WHERE T.b > 2"
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testFilterPushDownLeftSemi2(): Unit = {
+ val sqlQuery =
+ "SELECT * FROM (SELECT * FROM leftT WHERE EXISTS (SELECT * FROM rightT)) T WHERE T.b > 2"
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testFilterPushDownLeftSemi3(): Unit = {
+ val sqlQuery =
+ """
+ |SELECT * FROM (SELECT * FROM leftT WHERE EXISTS
+ | (SELECT * FROM rightT WHERE a = c)) T WHERE T.b > 2
+ """.stripMargin
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testJoinConditionPushDownLeftSemi1(): Unit = {
+ util.verifyPlan("SELECT * FROM leftT WHERE a IN (SELECT c FROM rightT WHERE b > 2)")
+ }
+
+ @Test
+ def testJoinConditionPushDownLeftSemi2(): Unit = {
+ util.verifyPlan("SELECT * FROM leftT WHERE EXISTS (SELECT * FROM rightT WHERE b > 2)")
+ }
+
+ @Test
+ def testJoinConditionPushDownLeftSemi3(): Unit = {
+ util.verifyPlan("SELECT * FROM leftT WHERE EXISTS (SELECT * FROM rightT WHERE a = c AND b > 2)")
+ }
+
+ @Test
+ def testFilterPushDownLeftAnti1(): Unit = {
+ val sqlQuery =
+ """
+ |SELECT * FROM (SELECT * FROM leftT WHERE a NOT IN
+ | (SELECT c FROM rightT WHERE c < 3)) T WHERE T.b > 2
+ """.stripMargin
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testFilterPushDownLeftAnti2(): Unit = {
+ val sqlQuery =
+ """
+ |SELECT * FROM (SELECT * FROM leftT WHERE NOT EXISTS
+ | (SELECT * FROM rightT where c > 10)) T WHERE T.b > 2
+ """.stripMargin
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testFilterPushDownLeftAnti3(): Unit = {
+ val sqlQuery =
+ """
+ |SELECT * FROM (SELECT * FROM leftT WHERE a NOT IN
+ |(SELECT c FROM rightT WHERE b = d AND c < 3)) T WHERE T.b > 2
+ """.stripMargin
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testFilterPushDownLeftAnti4(): Unit = {
+ val sqlQuery =
+ """
+ |SELECT * FROM (SELECT * FROM leftT WHERE NOT EXISTS
+ | (SELECT * FROM rightT WHERE a = c)) T WHERE T.b > 2
+ """.stripMargin
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testJoinConditionPushDownLeftAnti1(): Unit = {
+ util.verifyPlan("SELECT * FROM leftT WHERE a NOT IN (SELECT c FROM rightT WHERE b > 2)")
+ }
+
+ @Test
+ def testJoinConditionPushDownLeftAnti2(): Unit = {
+ util.verifyPlan("SELECT * FROM leftT WHERE NOT EXISTS (SELECT * FROM rightT WHERE b > 2)")
+ }
+
+ @Test
+ def testJoinConditionPushDownLeftAnti3(): Unit = {
+ val sqlQuery = "SELECT * FROM leftT WHERE a NOT IN (SELECT c FROM rightT WHERE b = d AND b > 1)"
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testJoinConditionPushDownLeftAnti4(): Unit = {
+ val sqlQuery =
+ "SELECT * FROM leftT WHERE NOT EXISTS (SELECT * FROM rightT WHERE a = c AND b > 2)"
+ util.verifyPlan(sqlQuery)
+ }
+
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/JoinConditionEqualityTransferRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/JoinConditionEqualityTransferRuleTest.scala
new file mode 100644
index 0000000..e1a5b6f
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/JoinConditionEqualityTransferRuleTest.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.plan.optimize.program.{FlinkBatchProgram, FlinkHepRuleSetProgramBuilder, HEP_RULES_EXECUTION_TYPE}
+import org.apache.flink.table.util.TableTestBase
+
+import org.apache.calcite.plan.hep.HepMatchOrder
+import org.apache.calcite.tools.RuleSets
+import org.junit.{Before, Test}
+
+
+/**
+ * Test for [[JoinConditionEqualityTransferRule]].
+ */
+class JoinConditionEqualityTransferRuleTest extends TableTestBase {
+
+ private val util = batchTestUtil()
+
+ @Before
+ def setup(): Unit = {
+ util.buildBatchProgram(FlinkBatchProgram.DEFAULT_REWRITE)
+ util.tableEnv.getConfig.getCalciteConfig.getBatchProgram.get.addLast(
+ "rules",
+ FlinkHepRuleSetProgramBuilder.newBuilder
+ .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
+ .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+ .add(RuleSets.ofList(JoinConditionEqualityTransferRule.INSTANCE))
+ .build()
+ )
+
+ util.addTableSource[(Int, Int, Int)]("MyTable1", 'a, 'b, 'c)
+ util.addTableSource[(Int, Int, Int)]("MyTable2", 'd, 'e, 'f)
+ }
+
+ @Test
+ def testInnerJoin1(): Unit = {
+ util.verifyPlan("SELECT * FROM MyTable1 JOIN MyTable2 ON a = d AND a = e")
+ }
+
+ @Test
+ def testInnerJoin2(): Unit = {
+ util.verifyPlan("SELECT * FROM MyTable1 JOIN MyTable2 ON a = d AND a = e AND b = d")
+ }
+
+ @Test
+ def testInnerJoin3(): Unit = {
+ util.verifyPlan("SELECT * FROM MyTable1 JOIN MyTable2 ON a = d AND a = e AND a = c")
+ }
+
+ @Test
+ def testInnerJoin4(): Unit = {
+ util.verifyPlan("SELECT * FROM MyTable1 JOIN MyTable2 ON a = d AND a = e AND b + 1 = d")
+ }
+
+ @Test
+ def testInnerJoinWithNonEquiCondition1(): Unit = {
+ util.verifyPlan("SELECT * FROM MyTable1 JOIN MyTable2 ON a = d AND a > e")
+ }
+
+ @Test
+ def testInnerJoinWithNonEquiCondition2(): Unit = {
+ util.verifyPlan("SELECT * FROM MyTable1 JOIN MyTable2 ON a = d AND a = e AND b > d")
+ }
+
+ @Test
+ def testSemiJoin_In1(): Unit = {
+ util.verifyPlan("SELECT * FROM MyTable1 WHERE a IN (SELECT d FROM MyTable2 WHERE a = e)")
+ }
+
+ @Test
+ def testSemiJoin_In2(): Unit = {
+ val sqlQuery =
+ "SELECT * FROM MyTable1 WHERE a IN (SELECT d FROM MyTable2 WHERE a = e AND b = d)"
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testSemiJoinWithNonEquiCondition_In1(): Unit = {
+ util.verifyPlan("SELECT * FROM MyTable1 WHERE a IN (SELECT d FROM MyTable2 WHERE a > e)")
+ }
+
+ @Test
+ def testSemiJoinWithNonEquiCondition_In2(): Unit = {
+ val sqlQuery =
+ "SELECT * FROM MyTable1 WHERE a IN (SELECT d FROM MyTable2 WHERE a > e AND b = d)"
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testSemiJoin_Exist1(): Unit = {
+ val sqlQuery =
+ """
+ |SELECT * FROM MyTable1 WHERE EXISTS (SELECT * FROM MyTable2 WHERE a = d AND a = e)
+ """.stripMargin
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testSemiJoin_Exist2(): Unit = {
+ val sqlQuery =
+ """
+ |SELECT * FROM MyTable1 WHERE EXISTS
+ | (SELECT * FROM MyTable2 WHERE a = d AND a = e AND b = d)
+ """.stripMargin
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testSemiJoinWithNonEquiCondition_Exist1(): Unit = {
+ val sqlQuery =
+ """
+ |SELECT * FROM MyTable1 WHERE EXISTS (SELECT * FROM MyTable2 WHERE a = d AND a > e)
+ """.stripMargin
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testSemiJoinWithNonEquiCondition_Exist2(): Unit = {
+ val sqlQuery =
+ """
+ |SELECT * FROM MyTable1 WHERE EXISTS
+ | (SELECT * FROM MyTable2 WHERE a = d AND a = e AND b > d)
+ """.stripMargin
+ util.verifyPlan(sqlQuery)
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/JoinConditionTypeCoerceRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/JoinConditionTypeCoerceRuleTest.scala
new file mode 100644
index 0000000..0f1cf9b
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/JoinConditionTypeCoerceRuleTest.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.plan.optimize.program.{FlinkBatchProgram, FlinkHepRuleSetProgramBuilder, HEP_RULES_EXECUTION_TYPE}
+import org.apache.flink.table.util.TableTestBase
+
+import org.apache.calcite.plan.hep.HepMatchOrder
+import org.apache.calcite.tools.RuleSets
+import org.junit.{Before, Test}
+
+/**
+ * Test for [[JoinConditionTypeCoerceRule]].
+ * Now only semi-join rewrite will lost the type consistency, so we only cover this kind of
+ * cases.
+ */
+class JoinConditionTypeCoerceRuleTest extends TableTestBase {
+ private val util = batchTestUtil()
+
+ @Before
+ def setup(): Unit = {
+ util.buildBatchProgram(FlinkBatchProgram.DEFAULT_REWRITE)
+ util.tableEnv.getConfig.getCalciteConfig.getBatchProgram.get.addLast(
+ "rules",
+ FlinkHepRuleSetProgramBuilder.newBuilder
+ .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
+ .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+ .add(RuleSets.ofList(JoinConditionTypeCoerceRule.INSTANCE))
+ .build()
+ )
+
+ util.addTableSource[(Int, Long, Float, Double, java.math.BigDecimal)]("T1", 'a, 'b, 'c, 'd, 'e)
+ util.addTableSource[(Int, Long, Float, Double, java.math.BigDecimal)]("T2", 'a, 'b, 'c, 'd, 'e)
+ }
+
+ @Test
+ def testInToSemiJoinIntEqualsLong(): Unit = {
+ util.verifyPlanWithType("SELECT * FROM T1 WHERE T1.a IN (SELECT b FROM T2)")
+ }
+
+ @Test
+ def testInToSemiJoinIntEqualsFloat(): Unit = {
+ util.verifyPlanWithType("SELECT * FROM T1 WHERE T1.a IN (SELECT c FROM T2)")
+ }
+
+ @Test
+ def testInToSemiJoinIntEqualsDouble(): Unit = {
+ util.verifyPlanWithType("SELECT * FROM T1 WHERE T1.a IN (SELECT d FROM T2)")
+ }
+
+ @Test
+ def testInToSemiJoinIntEqualsDecimal(): Unit = {
+ util.verifyPlanWithType("SELECT * FROM T1 WHERE T1.a IN (SELECT e FROM T2)")
+ }
+
+ @Test
+ def testInToSemiJoinFloatEqualsDouble(): Unit = {
+ util.verifyPlanWithType("SELECT * FROM T1 WHERE T1.c IN (SELECT d FROM T2)")
+ }
+
+ @Test
+ def testInToSemiJoinFloatEqualsDecimal(): Unit = {
+ util.verifyPlanWithType("SELECT * FROM T1 WHERE T1.c IN (SELECT e FROM T2)")
+ }
+
+ @Test
+ def testInToSemiJoinDoubleEqualsDecimal(): Unit = {
+ util.verifyPlanWithType("SELECT * FROM T1 WHERE T1.d IN (SELECT e FROM T2)")
+ }
+
+ // Test nullability mismatch.
+ @Test
+ def testJoinConditionEqualsTypesNotEquals01(): Unit = {
+ val sqlQuery = "SELECT a FROM T1 LEFT JOIN (SELECT COUNT(*) AS cnt FROM T2) AS x ON a = x.cnt"
+ util.verifyPlanWithType(sqlQuery)
+ }
+
+ // Join with or predicate is not rewrite by subquery rule but decorrelate.
+ @Test
+ def testJoinConditionEqualsTypesNotEquals02(): Unit = {
+ util.addTableSource[(String, Short, Int)]("T3", 't3a, 't3b, 't3c)
+ util.addTableSource[(String, Short, Int)]("T4", 't4a, 't4b, 't4c)
+
+ val sqlQuery =
+ """
+ |-- TC 01.04
+ |SELECT t3a,
+ | t3b
+ |FROM T3
+ |WHERE t3c IN (SELECT t4b
+ | FROM T4
+ | WHERE t3a = t4a
+ | OR t3b > t4b)
+ """.stripMargin
+ util.verifyPlanWithType(sqlQuery)
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/JoinDependentConditionDerivationRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/JoinDependentConditionDerivationRuleTest.scala
new file mode 100644
index 0000000..47eaf76
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/JoinDependentConditionDerivationRuleTest.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.plan.optimize.program.{FlinkBatchProgram, FlinkHepRuleSetProgramBuilder, HEP_RULES_EXECUTION_TYPE}
+import org.apache.flink.table.util.TableTestBase
+
+import org.apache.calcite.plan.hep.HepMatchOrder
+import org.apache.calcite.tools.RuleSets
+import org.junit.{Before, Test}
+
+
+/**
+ * Test for [[JoinDependentConditionDerivationRule]].
+ */
+class JoinDependentConditionDerivationRuleTest extends TableTestBase {
+
+ private val util = batchTestUtil()
+
+ @Before
+ def setup(): Unit = {
+ util.buildBatchProgram(FlinkBatchProgram.DEFAULT_REWRITE)
+ util.tableEnv.getConfig.getCalciteConfig.getBatchProgram.get.addLast(
+ "rules",
+ FlinkHepRuleSetProgramBuilder.newBuilder
+ .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
+ .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+ .add(RuleSets.ofList(
+ FlinkFilterJoinRule.FILTER_ON_JOIN,
+ FlinkFilterJoinRule.JOIN,
+ JoinDependentConditionDerivationRule.INSTANCE))
+ .build()
+ )
+
+ util.addTableSource[(Int, Long, String)]("MyTable1", 'a, 'b, 'c)
+ util.addTableSource[(Int, Long, Int, String, Long)]("MyTable2", 'd, 'e, 'f, 'g, 'h)
+ }
+
+ @Test
+ def testSimple(): Unit = {
+ val sqlQuery =
+ "SELECT a, d FROM MyTable1, MyTable2 WHERE (a = 1 AND d = 2) OR (a = 2 AND d = 1)"
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testAnd(): Unit = {
+ val sqlQuery =
+ "SELECT a, d FROM MyTable1, MyTable2 WHERE b = e AND ((a = 1 AND d = 2) OR (a = 2 AND d = 1))"
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testCanNotMatchThisRule(): Unit = {
+ val sqlQuery =
+ "SELECT a, d FROM MyTable1, MyTable2 WHERE b = e OR ((a = 1 AND d = 2) OR (a = 2 AND d = 1))"
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testThreeOr(): Unit = {
+ val sqlQuery = "SELECT a, d FROM MyTable1, MyTable2 WHERE " +
+ "(b = e AND a = 0) OR ((a = 1 AND d = 2) OR (a = 2 AND d = 1))"
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testAndOr(): Unit = {
+ val sqlQuery = "SELECT a, d FROM MyTable1, MyTable2 WHERE " +
+ "((a = 1 AND d = 2) OR (a = 2 AND d = 1)) AND ((a = 3 AND d = 4) OR (a = 4 AND d = 3))"
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testMultiFields(): Unit = {
+ val sqlQuery = "SELECT a, d FROM MyTable1, MyTable2 WHERE " +
+ "(a = 1 AND b = 1 AND d = 2 AND e = 2) OR (a = 2 AND b = 2 AND d = 1 AND e = 1)"
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testMultiSingleSideFields(): Unit = {
+ val sqlQuery = "SELECT a, d FROM MyTable1, MyTable2 WHERE " +
+ "(a = 1 AND b = 1 AND d = 2 AND e = 2) OR (d = 1 AND e = 1)"
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testMultiJoins(): Unit = {
+ val sqlQuery =
+ """
+ |SELECT T1.a, T2.d FROM MyTable1 T1,
+ | (SELECT * FROM MyTable1, MyTable2 WHERE a = d) T2 WHERE
+ |(T1.a = 1 AND T1.b = 1 AND T2.a = 2 AND T2.e = 2)
+ |OR
+ |(T1.a = 2 AND T2.b = 2 AND T2.d = 1 AND T2.e = 1)
+ """.stripMargin
+ util.verifyPlan(sqlQuery)
+ }
+
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/JoinDeriveNullFilterRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/JoinDeriveNullFilterRuleTest.scala
new file mode 100644
index 0000000..b70cdaa
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/JoinDeriveNullFilterRuleTest.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{PlannerConfigOptions, Types}
+import org.apache.flink.table.plan.optimize.program.{FlinkBatchProgram, FlinkHepRuleSetProgramBuilder, HEP_RULES_EXECUTION_TYPE}
+import org.apache.flink.table.plan.stats.{ColumnStats, TableStats}
+import org.apache.flink.table.util.TableTestBase
+
+import org.apache.calcite.plan.hep.HepMatchOrder
+import org.apache.calcite.tools.RuleSets
+import org.junit.{Before, Test}
+
+import scala.collection.JavaConversions._
+
+
+/**
+ * Test for [[JoinDeriveNullFilterRule]].
+ */
+class JoinDeriveNullFilterRuleTest extends TableTestBase {
+
+ private val util = batchTestUtil()
+
+ @Before
+ def setup(): Unit = {
+ util.buildBatchProgram(FlinkBatchProgram.DEFAULT_REWRITE)
+ util.tableEnv.getConfig.getCalciteConfig.getBatchProgram.get.addLast(
+ "rules",
+ FlinkHepRuleSetProgramBuilder.newBuilder
+ .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
+ .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+ .add(RuleSets.ofList(JoinDeriveNullFilterRule.INSTANCE))
+ .build()
+ )
+
+ util.tableEnv.getConfig.getConf.setLong(
+ PlannerConfigOptions.SQL_OPTIMIZER_JOIN_NULL_FILTER_THRESHOLD, 2000000)
+ util.addTableSource("MyTable1",
+ Array[TypeInformation[_]](Types.INT, Types.LONG, Types.STRING, Types.INT, Types.LONG),
+ Array("a1", "b1", "c1", "d1", "e1"),
+ Some(new TableStats(1000000000, Map(
+ "a1" -> new ColumnStats(null, 10000000L, 4.0, 4, null, null),
+ "c1" -> new ColumnStats(null, 5000000L, 10.2, 16, null, null),
+ "e1" -> new ColumnStats(null, 500000L, 8.0, 8, null, null)
+ ))))
+ util.addTableSource("MyTable2",
+ Array[TypeInformation[_]](Types.INT, Types.LONG, Types.STRING, Types.INT, Types.LONG),
+ Array("a2", "b2", "c2", "d2", "e2"),
+ Some(new TableStats(2000000000, Map(
+ "b2" -> new ColumnStats(null, 10000000L, 8.0, 8, null, null),
+ "c2" -> new ColumnStats(null, 3000000L, 18.6, 32, null, null),
+ "e2" -> new ColumnStats(null, 1500000L, 8.0, 8, null, null)
+ ))))
+ }
+
+ @Test
+ def testInnerJoin_NoneEquiJoinKeys(): Unit = {
+ util.verifyPlan("SELECT * FROM MyTable1 JOIN MyTable2 ON a1 > a2")
+ }
+
+ @Test
+ def testInnerJoin_NullCountOnLeftJoinKeys(): Unit = {
+ util.verifyPlan("SELECT * FROM MyTable1 JOIN MyTable2 ON a1 = a2")
+ }
+
+ @Test
+ def testInnerJoin_NullCountOnRightJoinKeys(): Unit = {
+ util.verifyPlan("SELECT * FROM MyTable1 JOIN MyTable2 ON b1 = b2")
+ }
+
+ @Test
+ def testInnerJoin_NullCountOnLeftRightJoinKeys(): Unit = {
+ util.verifyPlan("SELECT * FROM MyTable1 JOIN MyTable2 ON c1 = c2")
+ }
+
+ @Test
+ def testInnerJoin_NoNullCount(): Unit = {
+ util.verifyPlan("SELECT * FROM MyTable1 JOIN MyTable2 ON d1 = d2")
+ }
+
+ @Test
+ def testInnerJoin_NullCountLessThanThreshold(): Unit = {
+ util.verifyPlan("SELECT * FROM MyTable1 JOIN MyTable2 ON e1 = e2")
+ }
+
+ @Test
+ def testLeftJoin(): Unit = {
+ util.verifyPlan("SELECT * FROM MyTable1 LEFT JOIN MyTable2 ON c1 = c2")
+ }
+
+ @Test
+ def testRightJoin(): Unit = {
+ util.verifyPlan("SELECT * FROM MyTable1 RIGHT JOIN MyTable2 ON c1 = c2")
+ }
+
+ @Test
+ def testFullJoin(): Unit = {
+ util.verifyPlan("SELECT * FROM MyTable1 FULL JOIN MyTable2 ON c1 = c2")
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/SimplifyJoinConditionRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/SimplifyJoinConditionRuleTest.scala
new file mode 100644
index 0000000..b8ad2e7
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/SimplifyJoinConditionRuleTest.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.plan.optimize.program.{FlinkBatchProgram, FlinkHepRuleSetProgramBuilder, HEP_RULES_EXECUTION_TYPE}
+import org.apache.flink.table.util.TableTestBase
+
+import org.apache.calcite.plan.hep.HepMatchOrder
+import org.apache.calcite.tools.RuleSets
+import org.junit.{Before, Test}
+
+/**
+ * Tests for [[SimplifyJoinConditionRule]].
+ */
+class SimplifyJoinConditionRuleTest extends TableTestBase {
+
+ private val util = batchTestUtil()
+
+ @Before
+ def setup(): Unit = {
+ util.buildBatchProgram(FlinkBatchProgram.DEFAULT_REWRITE)
+ util.tableEnv.getConfig.getCalciteConfig.getBatchProgram.get.addLast(
+ "SimplifyJoinConditionRule",
+ FlinkHepRuleSetProgramBuilder.newBuilder
+ .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
+ .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+ .add(RuleSets.ofList(SimplifyJoinConditionRule.INSTANCE))
+ .build()
+ )
+
+ util.addTableSource[(Int, Long, String)]("MyTable1", 'a, 'b, 'c)
+ util.addTableSource[(Int, Long, String)]("MyTable2", 'd, 'e, 'f)
+ }
+
+ @Test
+ def testSimplifyJoinCondition(): Unit = {
+ val sqlQuery = "SELECT d FROM MyTable1 JOIN MyTable2 ON (d = a AND a > 2) OR (d = a AND b = 1)"
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testSimplifyJoinConditionFromSubQuery(): Unit = {
+ val sqlQuery =
+ """
+ |SELECT a FROM MyTable1 WHERE b = (
+ | SELECT COUNT(*) FROM MyTable2 WHERE (d = a AND d < 2) OR (d = a AND b = 5))
+ """.stripMargin
+ util.verifyPlan(sqlQuery)
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
index 0b210f5..ed5feed 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
@@ -189,6 +189,8 @@ class LookupJoinTest extends TableTestBase with Serializable {
@Test
def testJoinOnDifferentKeyTypes(): Unit = {
// Will do implicit type coercion.
+ thrown.expect(classOf[TableException])
+ thrown.expectMessage("VARCHAR(65536) and INTEGER does not have common type now")
streamUtil.verifyPlan("SELECT * FROM MyTable AS T JOIN temporalTest "
+ "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.b = D.id")
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
index e304407..fedd7e8 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
@@ -26,13 +26,15 @@ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.{TimeCharacteristic, environment}
import org.apache.flink.table.`type`.{InternalType, TypeConverters}
import org.apache.flink.table.api._
-import java.{BatchTableEnvironment => JavaBatchTableEnv, StreamTableEnvironment => JavaStreamTableEnv}
+import org.apache.flink.table.api.java.{BatchTableEnvironment => JavaBatchTableEnv, StreamTableEnvironment => JavaStreamTableEnv}
import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv, _}
import org.apache.flink.table.calcite.CalciteConfig
import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction}
import org.apache.flink.table.plan.nodes.exec.ExecNode
import org.apache.flink.table.plan.optimize.program.{FlinkBatchProgram, FlinkStreamProgram}
+import org.apache.flink.table.plan.schema.{BatchTableSourceTable, StreamTableSourceTable}
+import org.apache.flink.table.plan.stats.{FlinkStatistic, TableStats}
import org.apache.flink.table.plan.util.{ExecNodePlanDumper, FlinkRelOptUtil}
import org.apache.flink.table.runtime.utils.{BatchTableEnvUtil, TestingAppendTableSink, TestingRetractTableSink, TestingUpsertTableSink}
import org.apache.flink.table.sinks.{AppendStreamTableSink, CollectRowTableSink, RetractStreamTableSink, TableSink, UpsertStreamTableSink}
@@ -47,6 +49,8 @@ import org.junit.Assert.{assertEquals, assertTrue}
import org.junit.Rule
import org.junit.rules.{ExpectedException, TestName}
+import _root_.java.util.{Set => JSet}
+
import _root_.scala.collection.JavaConversions._
/**
@@ -134,24 +138,22 @@ abstract class TableTestUtil(test: TableTestBase) {
}
/**
- * Create a [[TestTableSource]] with the given schema,
+ * Create a [[TestTableSource]] with the given schema, table stats and unique keys,
* and registers this TableSource under given name into the TableEnvironment's catalog.
*
* @param name table name
* @param types field types
* @param names field names
+ * @param tableStats table stats
+ * @param uniqueKeys unique keys
* @return returns the registered [[Table]].
*/
def addTableSource(
name: String,
types: Array[TypeInformation[_]],
- names: Array[String]): Table = {
- val tableEnv = getTableEnv
- val schema = new TableSchema(names, types)
- val tableSource = new TestTableSource(schema)
- tableEnv.registerTableSource(name, tableSource)
- tableEnv.scan(name)
- }
+ names: Array[String],
+ tableStats: Option[TableStats] = None,
+ uniqueKeys: Option[JSet[_ <: JSet[String]]] = None): Table
/**
* Create a [[DataStream]] with the given schema,
@@ -472,6 +474,24 @@ case class StreamTableTestUtil(test: TableTestBase) extends TableTestUtil(test)
tableEnv.scan(name)
}
+ override def addTableSource(
+ name: String,
+ types: Array[TypeInformation[_]],
+ names: Array[String],
+ tableStats: Option[TableStats] = None,
+ uniqueKeys: Option[JSet[_ <: JSet[String]]] = None): Table = {
+ val tableEnv = getTableEnv
+ val schema = new TableSchema(names, types)
+ val tableSource = new TestTableSource(schema)
+ val statistic = FlinkStatistic.builder()
+ .tableStats(tableStats.orNull)
+ .uniqueKeys(uniqueKeys.orNull)
+ .build()
+ val table = new StreamTableSourceTable[BaseRow](tableSource, statistic)
+ tableEnv.registerTableInternal(name, table)
+ tableEnv.scan(name)
+ }
+
def verifyPlanWithTrait(): Unit = {
doVerifyPlan(
SqlExplainLevel.EXPPLAN_ATTRIBUTES,
@@ -572,6 +592,24 @@ case class BatchTableTestUtil(test: TableTestBase) extends TableTestUtil(test) {
tableEnv.scan(name)
}
+ override def addTableSource(
+ name: String,
+ types: Array[TypeInformation[_]],
+ names: Array[String],
+ tableStats: Option[TableStats] = None,
+ uniqueKeys: Option[JSet[_ <: JSet[String]]] = None): Table = {
+ val tableEnv = getTableEnv
+ val schema = new TableSchema(names, types)
+ val tableSource = new TestTableSource(schema)
+ val statistic = FlinkStatistic.builder()
+ .tableStats(tableStats.orNull)
+ .uniqueKeys(uniqueKeys.orNull)
+ .build()
+ val table = new BatchTableSourceTable[BaseRow](tableSource, statistic)
+ tableEnv.registerTableInternal(name, table)
+ tableEnv.scan(name)
+ }
+
def buildBatchProgram(firstProgramNameToRemove: String): Unit = {
val program = FlinkBatchProgram.buildProgram(tableEnv.getConfig.getConf)
var startRemove = false