You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/07/11 00:32:51 UTC

[flink] branch master updated (475c30c -> 9df0a03)

This is an automated email from the ASF dual-hosted git repository.

kurt pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 475c30c  [FLINK-13155][e2e] fix SQL client end-to-end test
     new 42b04b0  [FLINK-13076] [table-planner-blink] Bump Calcite dependency to 1.20.0 in blink planner
     new 9df0a03  [FLINK-13076] [table-planner-blink] Support true condition on lookup join condition

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 flink-table/flink-table-planner-blink/pom.xml      |  17 +-
 .../java/org/apache/calcite/rel/core/Join.java     | 341 ---------------------
 .../org/apache/calcite/rel/core/JoinRelType.java   | 184 -----------
 .../apache/calcite/sql2rel/RelDecorrelator.java    |  79 +++--
 .../catalog/FunctionCatalogOperatorTable.java      |   4 +-
 .../table/functions/sql/FlinkSqlOperatorTable.java |  30 +-
 .../plan/rules/logical/FlinkFilterJoinRule.java    |  14 +-
 .../plan/rules/logical/SubQueryDecorrelator.java   |   5 -
 .../table/calcite/FlinkLogicalRelFactories.scala   |   8 +-
 .../table/codegen/CorrelateCodeGenerator.scala     |  12 +-
 .../plan/metadata/FlinkRelMdColumnUniqueness.scala |   6 +-
 .../nodes/calcite/LogicalWindowAggregate.scala     |   3 +-
 .../table/plan/nodes/calcite/WindowAggregate.scala |   1 -
 .../plan/nodes/common/CommonPhysicalJoin.scala     |   5 +-
 .../plan/nodes/logical/FlinkLogicalAggregate.scala |  11 +-
 .../plan/nodes/logical/FlinkLogicalCorrelate.scala |   9 +-
 .../logical/FlinkLogicalTableFunctionScan.scala    |  10 +-
 .../logical/FlinkLogicalWindowAggregate.scala      |   3 +-
 .../nodes/physical/batch/BatchExecCorrelate.scala  |   8 +-
 .../physical/stream/StreamExecCorrelate.scala      |   6 +-
 .../plan/optimize/program/FlinkBatchProgram.scala  |   1 -
 .../plan/optimize/program/FlinkStreamProgram.scala |   1 -
 .../table/plan/rules/FlinkBatchRuleSets.scala      |   3 +-
 .../table/plan/rules/FlinkStreamRuleSets.scala     |   3 +-
 ...relateToJoinFromTemporalTableFunctionRule.scala |   6 +-
 ...gicalCorrelateToJoinFromTemporalTableRule.scala | 107 +++++--
 .../physical/stream/StreamExecCorrelateRule.scala  |  13 +-
 .../flink/table/plan/util/FlinkRelOptUtil.scala    | 194 +-----------
 .../table/plan/batch/sql/SetOperatorsTest.xml      |   4 +-
 .../table/plan/batch/sql/SubplanReuseTest.xml      |   4 +-
 .../table/plan/batch/sql/join/LookupJoinTest.xml   | 112 ++++---
 .../ReplaceIntersectWithSemiJoinRuleTest.xml       |   6 +-
 .../logical/ReplaceMinusWithAntiJoinRuleTest.xml   |   6 +-
 .../SubqueryCorrelateVariablesValidationTest.xml   | 102 ++++++
 .../table/plan/stream/sql/SetOperatorsTest.xml     |   4 +-
 .../table/plan/stream/sql/SubplanReuseTest.xml     |   4 +-
 .../table/plan/stream/sql/join/LookupJoinTest.xml  |  84 +++--
 .../validation/ScalarFunctionsValidationTest.scala |   6 +-
 .../table/plan/batch/sql/SubplanReuseTest.scala    |   3 +-
 .../table/plan/batch/sql/join/LookupJoinTest.scala |  12 +
 .../plan/metadata/FlinkRelMdHandlerTestBase.scala  |   4 +-
 .../logical/subquery/SubQueryAntiJoinTest.scala    |  14 +-
 .../logical/subquery/SubQuerySemiJoinTest.scala    |  12 +-
 .../SubqueryCorrelateVariablesValidationTest.scala |   7 +-
 .../table/plan/stream/sql/SubplanReuseTest.scala   |   3 +-
 .../plan/stream/sql/join/LookupJoinTest.scala      |  17 +-
 .../batch/sql/agg/PruneAggregateCallITCase.scala   |   2 +-
 flink-table/flink-table-runtime-blink/pom.xml      |   6 +-
 48 files changed, 530 insertions(+), 966 deletions(-)
 delete mode 100644 flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/rel/core/Join.java
 delete mode 100644 flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/rel/core/JoinRelType.java
 create mode 100644 flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/subquery/SubqueryCorrelateVariablesValidationTest.xml


[flink] 02/02: [FLINK-13076] [table-planner-blink] Support true condition on lookup join condition

Posted by ku...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9df0a032b023180aa113b6f2e68ad6ef27e0c7d6
Author: godfreyhe <go...@163.com>
AuthorDate: Mon Jul 8 17:05:25 2019 +0800

    [FLINK-13076] [table-planner-blink] Support true condition on lookup join condition
    
    This closes #8962
---
 .../table/plan/rules/FlinkBatchRuleSets.scala      |  3 +-
 .../table/plan/rules/FlinkStreamRuleSets.scala     |  3 +-
 ...relateToJoinFromTemporalTableFunctionRule.scala |  6 +-
 ...gicalCorrelateToJoinFromTemporalTableRule.scala | 76 +++++++++++++++++-----
 .../table/plan/batch/sql/join/LookupJoinTest.xml   | 28 ++++++++
 .../table/plan/stream/sql/join/LookupJoinTest.xml  | 28 ++++++++
 .../table/plan/batch/sql/join/LookupJoinTest.scala | 12 ++++
 .../plan/stream/sql/join/LookupJoinTest.scala      | 13 ++++
 8 files changed, 149 insertions(+), 20 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
index cce4016..7a1ba34 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
@@ -54,7 +54,8 @@ object FlinkBatchRuleSets {
     * can create new plan nodes.
     */
   val EXPAND_PLAN_RULES: RuleSet = RuleSets.ofList(
-    LogicalCorrelateToJoinFromTemporalTableRule.INSTANCE,
+    LogicalCorrelateToJoinFromTemporalTableRule.WITH_FILTER,
+    LogicalCorrelateToJoinFromTemporalTableRule.WITHOUT_FILTER,
     TableScanRule.INSTANCE)
 
   val POST_EXPAND_CLEAN_UP_RULES: RuleSet = RuleSets.ofList(
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
index 35da87e..a95ef31 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
@@ -54,7 +54,8 @@ object FlinkStreamRuleSets {
     * can create new plan nodes.
     */
   val EXPAND_PLAN_RULES: RuleSet = RuleSets.ofList(
-    LogicalCorrelateToJoinFromTemporalTableRule.INSTANCE,
+    LogicalCorrelateToJoinFromTemporalTableRule.WITH_FILTER,
+    LogicalCorrelateToJoinFromTemporalTableRule.WITHOUT_FILTER,
     LogicalCorrelateToJoinFromTemporalTableFunctionRule.INSTANCE,
     TableScanRule.INSTANCE)
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala
index 6b8f075..c4870b1 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala
@@ -24,11 +24,12 @@ import org.apache.flink.table.expressions.{FieldReferenceExpression, _}
 import org.apache.flink.table.functions.utils.TableSqlFunction
 import org.apache.flink.table.functions.{TemporalTableFunction, TemporalTableFunctionImpl}
 import org.apache.flink.table.operations.QueryOperation
-import org.apache.flink.table.plan.util.{ExpandTableScanShuttle, RexDefaultVisitor}
 import org.apache.flink.table.plan.util.TemporalJoinUtil.{makeProcTimeTemporalJoinConditionCall, makeRowTimeTemporalJoinConditionCall}
+import org.apache.flink.table.plan.util.{ExpandTableScanShuttle, RexDefaultVisitor}
 import org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks.{hasRoot, isProctimeAttribute}
 import org.apache.flink.util.Preconditions.checkState
+
 import org.apache.calcite.plan.RelOptRule.{any, none, operand, some}
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.calcite.rel.RelNode
@@ -45,8 +46,7 @@ import org.apache.calcite.rex._
 class LogicalCorrelateToJoinFromTemporalTableFunctionRule
   extends RelOptRule(
     operand(classOf[LogicalCorrelate],
-      some(
-        operand(classOf[RelNode], any()),
+      some(operand(classOf[RelNode], any()),
         operand(classOf[TableFunctionScan], none()))),
     "LogicalCorrelateToJoinFromTemporalTableFunctionRule") {
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
index 38f677e..15e3070 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
@@ -18,7 +18,7 @@
 package org.apache.flink.table.plan.rules.logical
 
 import org.apache.calcite.plan.RelOptRule.{any, operand}
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.logical.{LogicalCorrelate, LogicalFilter, LogicalSnapshot}
 import org.apache.calcite.rex.{RexCorrelVariable, RexFieldAccess, RexInputRef, RexNode, RexShuttle}
@@ -29,26 +29,24 @@ import org.apache.calcite.rex.{RexCorrelVariable, RexFieldAccess, RexInputRef, R
   * [[org.apache.flink.table.plan.nodes.physical.stream.StreamExecLookupJoin]] in physical and
   * might be translated into
   * [[org.apache.flink.table.plan.nodes.physical.stream.StreamExecTemporalJoin]] in the future.
-  *
-  * TODO supports `true` join condition
   */
-class LogicalCorrelateToJoinFromTemporalTableRule
-  extends RelOptRule(
-    operand(classOf[LogicalCorrelate],
-      operand(classOf[RelNode], any()),
-      operand(classOf[LogicalFilter],
-        operand(classOf[LogicalSnapshot], any()))),
-    "LogicalCorrelateToJoinFromTemporalTableRule") {
+abstract class LogicalCorrelateToJoinFromTemporalTableRule(
+    operand: RelOptRuleOperand,
+    description: String)
+  extends RelOptRule(operand, description) {
+
+  def getFilterCondition(call: RelOptRuleCall): RexNode
+
+  def getLogicalSnapshot(call: RelOptRuleCall): LogicalSnapshot
 
   override def onMatch(call: RelOptRuleCall): Unit = {
     val correlate: LogicalCorrelate = call.rel(0)
     val leftInput: RelNode = call.rel(1)
-    val filter: LogicalFilter = call.rel(2)
-    val snapshot: LogicalSnapshot = call.rel[LogicalSnapshot](3)
+    val filterCondition = getFilterCondition(call)
+    val snapshot = getLogicalSnapshot(call)
 
     val leftRowType = leftInput.getRowType
-    val condition = filter.getCondition
-    val joinCondition = condition.accept(new RexShuttle() {
+    val joinCondition = filterCondition.accept(new RexShuttle() {
       // change correlate variable expression to normal RexInputRef (which is from left side)
       override def visitFieldAccess(fieldAccess: RexFieldAccess): RexNode = {
         fieldAccess.getReferenceExpr match {
@@ -78,6 +76,54 @@ class LogicalCorrelateToJoinFromTemporalTableRule
 
 }
 
+/**
+  * Planner rule that matches temporal table join which join condition is not true,
+  * that means the right input of the Correlate is a Filter.
+  * e.g. SELECT * FROM MyTable AS T JOIN temporalTest FOR SYSTEM_TIME AS OF T.proctime AS D
+  * ON T.a = D.id
+  */
+class LogicalCorrelateToJoinFromTemporalTableRuleWithFilter
+  extends LogicalCorrelateToJoinFromTemporalTableRule(
+    operand(classOf[LogicalCorrelate],
+      operand(classOf[RelNode], any()),
+      operand(classOf[LogicalFilter],
+        operand(classOf[LogicalSnapshot], any()))),
+    "LogicalCorrelateToJoinFromTemporalTableRuleWithFilter"
+  ) {
+
+  override def getFilterCondition(call: RelOptRuleCall): RexNode = {
+    val filter: LogicalFilter = call.rel(2)
+    filter.getCondition
+  }
+
+  override def getLogicalSnapshot(call: RelOptRuleCall): LogicalSnapshot = {
+    call.rels(3).asInstanceOf[LogicalSnapshot]
+  }
+}
+
+/**
+  * Planner rule that matches temporal table join which join condition is true,
+  * that means the right input of the Correlate is a Snapshot.
+  * e.g. SELECT * FROM MyTable AS T JOIN temporalTest FOR SYSTEM_TIME AS OF T.proctime AS D ON true
+  */
+class LogicalCorrelateToJoinFromTemporalTableRuleWithoutFilter
+  extends LogicalCorrelateToJoinFromTemporalTableRule(
+    operand(classOf[LogicalCorrelate],
+      operand(classOf[RelNode], any()),
+      operand(classOf[LogicalSnapshot], any())),
+    "LogicalCorrelateToJoinFromTemporalTableRuleWithoutFilter"
+  ) {
+
+  override def getFilterCondition(call: RelOptRuleCall): RexNode = {
+    call.builder().literal(true)
+  }
+
+  override def getLogicalSnapshot(call: RelOptRuleCall): LogicalSnapshot = {
+    call.rels(2).asInstanceOf[LogicalSnapshot]
+  }
+}
+
 object LogicalCorrelateToJoinFromTemporalTableRule {
-  val INSTANCE = new LogicalCorrelateToJoinFromTemporalTableRule
+  val WITH_FILTER = new LogicalCorrelateToJoinFromTemporalTableRuleWithFilter
+  val WITHOUT_FILTER = new LogicalCorrelateToJoinFromTemporalTableRuleWithoutFilter
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.xml
index 0377621..8eb5180 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.xml
@@ -236,6 +236,34 @@ FlinkLogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)], EXPR$2=[SUM($2)], EXPR$3=
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testJoinTemporalTableWithTrueCondition">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM MyTable AS T
+JOIN temporalTest FOR SYSTEM_TIME AS OF T.proctime AS D
+ON true
+WHERE T.c > 1000
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], id=[$4], name=[$5], age=[$6])
++- LogicalFilter(condition=[>($2, 1000)])
+   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{3}])
+      :- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()])
+      :  +- LogicalTableScan(table=[[default_catalog, default_database, T0]])
+      +- LogicalSnapshot(period=[$cor0.proctime])
+         +- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LookupJoin(table=[TestTemporalTable(id, name, age)], joinType=[InnerJoin], async=[false], on=[], select=[a, b, c, proctime, id, name, age])
++- Calc(select=[a, b, c, PROCTIME() AS proctime], where=[>(c, 1000)])
+   +- BoundedStreamScan(table=[[default_catalog, default_database, T0]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testReusing">
     <Resource name="sql">
       <![CDATA[
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.xml
index 8e56ae1..358f601 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.xml
@@ -242,4 +242,32 @@ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testJoinTemporalTableWithTrueCondition">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM MyTable AS T
+JOIN temporalTest FOR SYSTEM_TIME AS OF T.proctime AS D
+ON true
+WHERE T.c > 1000
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
++- LogicalFilter(condition=[>($2, 1000)])
+   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{3}])
+      :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+      +- LogicalSnapshot(period=[$cor0.proctime])
+         +- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, name, age])
++- LookupJoin(table=[TestTemporalTable(id, name, age)], joinType=[InnerJoin], async=[false], on=[], select=[a, b, c, proctime, rowtime, id, name, age])
+   +- Calc(select=[a, b, c, proctime, rowtime], where=[>(c, 1000)])
+      +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
 </Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala
index b1083d0..d9067a43 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala
@@ -215,6 +215,18 @@ class LookupJoinTest extends TableTestBase {
   }
 
   @Test
+  def testJoinTemporalTableWithTrueCondition(): Unit = {
+    val sql =
+      """
+        |SELECT * FROM MyTable AS T
+        |JOIN temporalTest FOR SYSTEM_TIME AS OF T.proctime AS D
+        |ON true
+        |WHERE T.c > 1000
+      """.stripMargin
+    testUtil.verifyPlan(sql)
+  }
+
+  @Test
   def testReusing(): Unit = {
     testUtil.tableEnv.getConfig.getConf.setBoolean(
       OptimizerConfigOptions.SQL_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, true)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
index 1294ff4..d01616a 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
@@ -317,6 +317,19 @@ class LookupJoinTest extends TableTestBase with Serializable {
     streamUtil.verifyPlan(sql)
   }
 
+  @Test
+  def testJoinTemporalTableWithTrueCondition(): Unit = {
+    val sql =
+      """
+        |SELECT * FROM MyTable AS T
+        |JOIN temporalTest FOR SYSTEM_TIME AS OF T.proctime AS D
+        |ON true
+        |WHERE T.c > 1000
+      """.stripMargin
+
+    streamUtil.verifyPlan(sql)
+  }
+
   // ==========================================================================================
 
   private def expectExceptionThrown(


[flink] 01/02: [FLINK-13076] [table-planner-blink] Bump Calcite dependency to 1.20.0 in blink planner

Posted by ku...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 42b04b01976206b6fc7ebcb58871b627463171f0
Author: godfreyhe <go...@163.com>
AuthorDate: Wed Jul 3 16:53:19 2019 +0800

    [FLINK-13076] [table-planner-blink] Bump Calcite dependency to 1.20.0 in blink planner
---
 flink-table/flink-table-planner-blink/pom.xml      |  17 +-
 .../java/org/apache/calcite/rel/core/Join.java     | 341 ---------------------
 .../org/apache/calcite/rel/core/JoinRelType.java   | 184 -----------
 .../apache/calcite/sql2rel/RelDecorrelator.java    |  79 +++--
 .../catalog/FunctionCatalogOperatorTable.java      |   4 +-
 .../table/functions/sql/FlinkSqlOperatorTable.java |  30 +-
 .../plan/rules/logical/FlinkFilterJoinRule.java    |  14 +-
 .../plan/rules/logical/SubQueryDecorrelator.java   |   5 -
 .../table/calcite/FlinkLogicalRelFactories.scala   |   8 +-
 .../table/codegen/CorrelateCodeGenerator.scala     |  12 +-
 .../plan/metadata/FlinkRelMdColumnUniqueness.scala |   6 +-
 .../nodes/calcite/LogicalWindowAggregate.scala     |   3 +-
 .../table/plan/nodes/calcite/WindowAggregate.scala |   1 -
 .../plan/nodes/common/CommonPhysicalJoin.scala     |   5 +-
 .../plan/nodes/logical/FlinkLogicalAggregate.scala |  11 +-
 .../plan/nodes/logical/FlinkLogicalCorrelate.scala |   9 +-
 .../logical/FlinkLogicalTableFunctionScan.scala    |  10 +-
 .../logical/FlinkLogicalWindowAggregate.scala      |   3 +-
 .../nodes/physical/batch/BatchExecCorrelate.scala  |   8 +-
 .../physical/stream/StreamExecCorrelate.scala      |   6 +-
 .../plan/optimize/program/FlinkBatchProgram.scala  |   1 -
 .../plan/optimize/program/FlinkStreamProgram.scala |   1 -
 ...gicalCorrelateToJoinFromTemporalTableRule.scala |  49 ++-
 .../physical/stream/StreamExecCorrelateRule.scala  |  13 +-
 .../flink/table/plan/util/FlinkRelOptUtil.scala    | 194 +-----------
 .../table/plan/batch/sql/SetOperatorsTest.xml      |   4 +-
 .../table/plan/batch/sql/SubplanReuseTest.xml      |   4 +-
 .../table/plan/batch/sql/join/LookupJoinTest.xml   |  84 ++---
 .../ReplaceIntersectWithSemiJoinRuleTest.xml       |   6 +-
 .../logical/ReplaceMinusWithAntiJoinRuleTest.xml   |   6 +-
 .../SubqueryCorrelateVariablesValidationTest.xml   | 102 ++++++
 .../table/plan/stream/sql/SetOperatorsTest.xml     |   4 +-
 .../table/plan/stream/sql/SubplanReuseTest.xml     |   4 +-
 .../table/plan/stream/sql/join/LookupJoinTest.xml  |  56 ++--
 .../validation/ScalarFunctionsValidationTest.scala |   6 +-
 .../table/plan/batch/sql/SubplanReuseTest.scala    |   3 +-
 .../plan/metadata/FlinkRelMdHandlerTestBase.scala  |   4 +-
 .../logical/subquery/SubQueryAntiJoinTest.scala    |  14 +-
 .../logical/subquery/SubQuerySemiJoinTest.scala    |  12 +-
 .../SubqueryCorrelateVariablesValidationTest.scala |   7 +-
 .../table/plan/stream/sql/SubplanReuseTest.scala   |   3 +-
 .../plan/stream/sql/join/LookupJoinTest.scala      |   4 +-
 .../batch/sql/agg/PruneAggregateCallITCase.scala   |   2 +-
 flink-table/flink-table-runtime-blink/pom.xml      |   6 +-
 44 files changed, 390 insertions(+), 955 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/pom.xml b/flink-table/flink-table-planner-blink/pom.xml
index 461d63e..b0ba76f 100644
--- a/flink-table/flink-table-planner-blink/pom.xml
+++ b/flink-table/flink-table-planner-blink/pom.xml
@@ -150,17 +150,13 @@ under the License.
 			<groupId>org.apache.calcite</groupId>
 			<artifactId>calcite-core</artifactId>
 			<!-- When updating the Calcite version, make sure to update the dependency exclusions -->
-			<version>1.19.0</version>
+			<version>1.20.0</version>
 			<exclusions>
 				<!--
+				"mvn dependency:tree" as of Calcite 1.20:
 
-				Dependencies that are not needed for how we use Calcite right now.
-
-				"mvn dependency:tree" as of Calcite 1.19:
-
-				[INFO] +- org.apache.calcite:calcite-core:jar:1.19.0:compile
-				[INFO] |  +- org.apache.calcite.avatica:avatica-core:jar:1.13.0:compile
-				[INFO] |  +- org.apache.calcite:calcite-linq4j:jar:1.19.0:compile
+				[INFO] +- org.apache.calcite:calcite-core:jar:1.20.0:compile
+				[INFO] |  +- org.apache.calcite:calcite-linq4j:jar:1.20.0:compile
 				[INFO] |  +- org.apache.commons:commons-lang3:jar:3.3.2:compile
 				[INFO] |  +- com.fasterxml.jackson.core:jackson-core:jar:2.9.6:compile
 				[INFO] |  +- com.fasterxml.jackson.core:jackson-annotations:jar:2.9.6:compile
@@ -168,6 +164,7 @@ under the License.
 				[INFO] |  +- com.google.guava:guava:jar:19.0:compile
 				[INFO] |  \- com.jayway.jsonpath:json-path:jar:2.4.0:compile
 
+				Dependencies that are not needed for how we use Calcite right now.
 				-->
 				<exclusion>
 					<groupId>org.apache.calcite.avatica</groupId>
@@ -205,6 +202,10 @@ under the License.
 					<groupId>net.hydromatic</groupId>
 					<artifactId>aggdesigner-algorithm</artifactId>
 				</exclusion>
+				<exclusion>
+					<groupId>commons-codec</groupId>
+					<artifactId>commons-codec</artifactId>
+				</exclusion>
 			</exclusions>
 		</dependency>
 
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/rel/core/Join.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/rel/core/Join.java
deleted file mode 100644
index 657efbf..0000000
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/rel/core/Join.java
+++ /dev/null
@@ -1,341 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.calcite.rel.core;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import org.apache.calcite.rel.RelNode.Context;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptCost;
-import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.BiRel;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelWriter;
-import org.apache.calcite.rel.metadata.RelMdUtil;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.rex.RexChecker;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexShuttle;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.sql.validate.SqlValidatorUtil;
-import org.apache.calcite.util.Litmus;
-import org.apache.calcite.util.Util;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import java.util.Set;
-
-/**
- * This class is copied from https://github.com/apache/calcite/pull/1157 to supports SEMI/ANTI join.
- * NOTES: This file should be deleted when upgrading to a new calcite version
- * which contains CALCITE-2969.
- */
-
-/**
- * Relational expression that combines two relational expressions according to
- * some condition.
- *
- * <p>Each output row has columns from the left and right inputs.
- * The set of output rows is a subset of the cartesian product of the two
- * inputs; precisely which subset depends on the join condition.
- */
-public abstract class Join extends BiRel {
-  //~ Instance fields --------------------------------------------------------
-
-  protected final RexNode condition;
-  protected final ImmutableSet<CorrelationId> variablesSet;
-
-  /**
-   * Values must be of enumeration {@link JoinRelType}, except that
-   * {@link JoinRelType#RIGHT} is disallowed.
-   */
-  protected final JoinRelType joinType;
-
-  protected final JoinInfo joinInfo;
-
-  //~ Constructors -----------------------------------------------------------
-
-  // Next time we need to change the constructor of Join, let's change the
-  // "Set<String> variablesStopped" parameter to
-  // "Set<CorrelationId> variablesSet". At that point we would deprecate
-  // RelNode.getVariablesStopped().
-
-  /**
-   * Creates a Join.
-   *
-   * <p>Note: We plan to change the {@code variablesStopped} parameter to
-   * {@code Set&lt;CorrelationId&gt; variablesSet}
-   * {@link org.apache.calcite.util.Bug#upgrade(String) before version 2.0},
-   * because {@link #getVariablesSet()}
-   * is preferred over {@link #getVariablesStopped()}.
-   * This constructor is not deprecated, for now, because maintaining overloaded
-   * constructors in multiple sub-classes would be onerous.
-   *
-   * @param cluster          Cluster
-   * @param traitSet         Trait set
-   * @param left             Left input
-   * @param right            Right input
-   * @param condition        Join condition
-   * @param joinType         Join type
-   * @param variablesSet     Set variables that are set by the
-   *                         LHS and used by the RHS and are not available to
-   *                         nodes above this Join in the tree
-   */
-  protected Join(
-      RelOptCluster cluster,
-      RelTraitSet traitSet,
-      RelNode left,
-      RelNode right,
-      RexNode condition,
-      Set<CorrelationId> variablesSet,
-      JoinRelType joinType) {
-    super(cluster, traitSet, left, right);
-    this.condition = Objects.requireNonNull(condition);
-    this.variablesSet = ImmutableSet.copyOf(variablesSet);
-    this.joinType = Objects.requireNonNull(joinType);
-    this.joinInfo = JoinInfo.of(left, right, condition);
-  }
-
-  @Deprecated // to be removed before 2.0
-  protected Join(
-      RelOptCluster cluster,
-      RelTraitSet traitSet,
-      RelNode left,
-      RelNode right,
-      RexNode condition,
-      JoinRelType joinType,
-      Set<String> variablesStopped) {
-    this(cluster, traitSet, left, right, condition,
-        CorrelationId.setOf(variablesStopped), joinType);
-  }
-
-  //~ Methods ----------------------------------------------------------------
-
-  @Override public List<RexNode> getChildExps() {
-    return ImmutableList.of(condition);
-  }
-
-  @Override public RelNode accept(RexShuttle shuttle) {
-    RexNode condition = shuttle.apply(this.condition);
-    if (this.condition == condition) {
-      return this;
-    }
-    return copy(traitSet, condition, left, right, joinType, isSemiJoinDone());
-  }
-
-  public RexNode getCondition() {
-    return condition;
-  }
-
-  public JoinRelType getJoinType() {
-    return joinType;
-  }
-
-  @Override public boolean isValid(Litmus litmus, Context context) {
-    if (!super.isValid(litmus, context)) {
-      return false;
-    }
-    if (getRowType().getFieldCount()
-        != getSystemFieldList().size()
-        + left.getRowType().getFieldCount()
-        + (joinType.projectsRight() ? right.getRowType().getFieldCount() : 0)) {
-      return litmus.fail("field count mismatch");
-    }
-    if (condition != null) {
-      if (condition.getType().getSqlTypeName() != SqlTypeName.BOOLEAN) {
-        return litmus.fail("condition must be boolean: {}",
-            condition.getType());
-      }
-      // The input to the condition is a row type consisting of system
-      // fields, left fields, and right fields. Very similar to the
-      // output row type, except that fields have not yet been made due
-      // due to outer joins.
-      RexChecker checker =
-          new RexChecker(
-              getCluster().getTypeFactory().builder()
-                  .addAll(getSystemFieldList())
-                  .addAll(getLeft().getRowType().getFieldList())
-                  .addAll(getRight().getRowType().getFieldList())
-                  .build(),
-              context, litmus);
-      condition.accept(checker);
-      if (checker.getFailureCount() > 0) {
-        return litmus.fail(checker.getFailureCount()
-            + " failures in condition " + condition);
-      }
-    }
-    return litmus.succeed();
-  }
-
-  @Override public RelOptCost computeSelfCost(RelOptPlanner planner,
-      RelMetadataQuery mq) {
-    // Maybe we should remove this for semi-join ?
-    if (!joinType.projectsRight()) {
-      // REVIEW jvs 9-Apr-2006:  Just for now...
-      return planner.getCostFactory().makeTinyCost();
-    }
-    double rowCount = mq.getRowCount(this);
-    return planner.getCostFactory().makeCost(rowCount, 0, 0);
-  }
-
-  /** @deprecated Use {@link RelMdUtil#getJoinRowCount(RelMetadataQuery, Join, RexNode)}. */
-  @Deprecated // to be removed before 2.0
-  public static double estimateJoinedRows(
-      Join joinRel,
-      RexNode condition) {
-    final RelMetadataQuery mq = RelMetadataQuery.instance();
-    return Util.first(RelMdUtil.getJoinRowCount(mq, joinRel, condition), 1D);
-  }
-
-  @Override public double estimateRowCount(RelMetadataQuery mq) {
-    return Util.first(RelMdUtil.getJoinRowCount(mq, this, condition), 1D);
-  }
-
-  @Override public Set<CorrelationId> getVariablesSet() {
-    return variablesSet;
-  }
-
-  @Override public RelWriter explainTerms(RelWriter pw) {
-    return super.explainTerms(pw)
-        .item("condition", condition)
-        .item("joinType", joinType.lowerName)
-        .itemIf(
-            "systemFields",
-            getSystemFieldList(),
-            !getSystemFieldList().isEmpty());
-  }
-
-  @Override protected RelDataType deriveRowType() {
-    assert getSystemFieldList() != null;
-    RelDataType leftType = left.getRowType();
-    RelDataType rightType = right.getRowType();
-    RelDataTypeFactory typeFactory = getCluster().getTypeFactory();
-    switch (joinType) {
-      case LEFT:
-        rightType = typeFactory.createTypeWithNullability(rightType, true);
-        break;
-      case RIGHT:
-        leftType = typeFactory.createTypeWithNullability(leftType, true);
-        break;
-      case FULL:
-        leftType = typeFactory.createTypeWithNullability(leftType, true);
-        rightType = typeFactory.createTypeWithNullability(rightType, true);
-        break;
-      case SEMI:
-      case ANTI:
-        rightType = null;
-      default:
-        break;
-    }
-    return createJoinType(typeFactory, leftType, rightType, null, getSystemFieldList());
-  }
-
-  /**
-   * Returns whether this LogicalJoin has already spawned a
-   * {@code SemiJoin} via
-   * {@link org.apache.calcite.rel.rules.JoinAddRedundantSemiJoinRule}.
-   *
-   * <p>The base implementation returns false.</p>
-   *
-   * @return whether this join has already spawned a semi join
-   */
-  public boolean isSemiJoinDone() {
-    return false;
-  }
-
-  /**
-   * Returns whether this Join is a semijoin.
-   *
-   * @return true if this Join's join type is semi.
-   */
-  public boolean isSemiJoin() {
-    return joinType == JoinRelType.SEMI;
-  }
-
-  /**
-   * Returns a list of system fields that will be prefixed to
-   * output row type.
-   *
-   * @return list of system fields
-   */
-  public List<RelDataTypeField> getSystemFieldList() {
-    return Collections.emptyList();
-  }
-
-  @Deprecated // to be removed before 2.0
-  public static RelDataType deriveJoinRowType(
-      RelDataType leftType,
-      RelDataType rightType,
-      JoinRelType joinType,
-      RelDataTypeFactory typeFactory,
-      List<String> fieldNameList,
-      List<RelDataTypeField> systemFieldList) {
-    return SqlValidatorUtil.deriveJoinRowType(leftType, rightType, joinType,
-        typeFactory, fieldNameList, systemFieldList);
-  }
-
-  @Deprecated // to be removed before 2.0
-  public static RelDataType createJoinType(
-      RelDataTypeFactory typeFactory,
-      RelDataType leftType,
-      RelDataType rightType,
-      List<String> fieldNameList,
-      List<RelDataTypeField> systemFieldList) {
-    return SqlValidatorUtil.createJoinType(typeFactory, leftType, rightType,
-        fieldNameList, systemFieldList);
-  }
-
-  @Override public final Join copy(RelTraitSet traitSet, List<RelNode> inputs) {
-    assert inputs.size() == 2;
-    return copy(traitSet, getCondition(), inputs.get(0), inputs.get(1),
-        joinType, isSemiJoinDone());
-  }
-
-  /**
-   * Creates a copy of this join, overriding condition, system fields and
-   * inputs.
-   *
-   * <p>General contract as {@link RelNode#copy}.
-   *
-   * @param traitSet      Traits
-   * @param conditionExpr Condition
-   * @param left          Left input
-   * @param right         Right input
-   * @param joinType      Join type
-   * @param semiJoinDone  Whether this join has been translated to a
-   *                      semi-join
-   * @return Copy of this join
-   */
-  public abstract Join copy(RelTraitSet traitSet, RexNode conditionExpr,
-      RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone);
-
-  /**
-   * Analyzes the join condition.
-   *
-   * @return Analyzed join condition
-   */
-  public JoinInfo analyzeCondition() {
-    return joinInfo;
-  }
-}
-
-// End Join.java
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/rel/core/JoinRelType.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/rel/core/JoinRelType.java
deleted file mode 100644
index 8e69d15..0000000
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/rel/core/JoinRelType.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.calcite.rel.core;
-
-import org.apache.calcite.linq4j.CorrelateJoinType;
-
-import java.util.Locale;
-
-/**
- * This class is copied from https://github.com/apache/calcite/pull/1157 to supports SEMI/ANTI join.
- * NOTES: This file should be deleted when upgrading to a new calcite version
- * which contains CALCITE-2969.
- */
-
-/**
- * Enumeration of join types.
- */
-public enum JoinRelType {
-  /**
-   * Inner join.
-   */
-  INNER,
-
-  /**
-   * Left-outer join.
-   */
-  LEFT,
-
-  /**
-   * Right-outer join.
-   */
-  RIGHT,
-
-  /**
-   * Full-outer join.
-   */
-  FULL,
-
-  /**
-   * Semi-join.
-   *
-   * <p>For example, {@code EMP semi-join DEPT} finds all {@code EMP} records
-   * that have a corresponding {@code DEPT} record:
-   *
-   * <blockquote><pre>
-   * SELECT * FROM EMP
-   * WHERE EXISTS (SELECT 1 FROM DEPT
-   *     WHERE DEPT.DEPTNO = EMP.DEPTNO)</pre>
-   * </blockquote>
-   */
-  SEMI,
-
-  /**
-   * Anti-join.
-   *
-   * <p>For example, {@code EMP anti-join DEPT} finds all {@code EMP} records
-   * that do not have a corresponding {@code DEPT} record:
-   *
-   * <blockquote><pre>
-   * SELECT * FROM EMP
-   * WHERE NOT EXISTS (SELECT 1 FROM DEPT
-   *     WHERE DEPT.DEPTNO = EMP.DEPTNO)</pre>
-   * </blockquote>
-   */
-  ANTI;
-
-  /** Lower-case name. */
-  public final String lowerName = name().toLowerCase(Locale.ROOT);
-
-  /**
-   * Returns whether a join of this type may generate NULL values on the
-   * right-hand side.
-   */
-  public boolean generatesNullsOnRight() {
-    return (this == LEFT) || (this == FULL);
-  }
-
-  /**
-   * Returns whether a join of this type may generate NULL values on the
-   * left-hand side.
-   */
-  public boolean generatesNullsOnLeft() {
-    return (this == RIGHT) || (this == FULL);
-  }
-
-  /**
-   * Swaps left to right, and vice versa.
-   */
-  public JoinRelType swap() {
-    switch (this) {
-    case LEFT:
-      return RIGHT;
-    case RIGHT:
-      return LEFT;
-    default:
-      return this;
-    }
-  }
-
-  /** Returns whether this join type generates nulls on side #{@code i}. */
-  public boolean generatesNullsOn(int i) {
-    switch (i) {
-    case 0:
-      return generatesNullsOnLeft();
-    case 1:
-      return generatesNullsOnRight();
-    default:
-      throw new IllegalArgumentException("invalid: " + i);
-    }
-  }
-
-  /** Returns a join type similar to this but that does not generate nulls on
-   * the left. */
-  public JoinRelType cancelNullsOnLeft() {
-    switch (this) {
-    case RIGHT:
-      return INNER;
-    case FULL:
-      return LEFT;
-    default:
-      return this;
-    }
-  }
-
-  /** Returns a join type similar to this but that does not generate nulls on
-   * the right. */
-  public JoinRelType cancelNullsOnRight() {
-    switch (this) {
-    case LEFT:
-      return INNER;
-    case FULL:
-      return RIGHT;
-    default:
-      return this;
-    }
-  }
-
-  /** Transform this JoinRelType to CorrelateJoinType. **/
-  public CorrelateJoinType toLinq4j() {
-    switch (this) {
-    case INNER:
-      return CorrelateJoinType.INNER;
-    case LEFT:
-      return CorrelateJoinType.LEFT;
-    case SEMI:
-      return CorrelateJoinType.SEMI;
-    case ANTI:
-      return CorrelateJoinType.ANTI;
-    }
-    throw new IllegalStateException(
-        "Unable to convert " + this + " to CorrelateJoinType");
-  }
-
-  public boolean projectsRight() {
-    switch (this) {
-    case INNER:
-    case LEFT:
-    case RIGHT:
-    case FULL:
-      return true;
-    case SEMI:
-    case ANTI:
-      return false;
-    }
-    throw new IllegalStateException(
-        "Unable to convert " + this + " to JoinRelType");
-  }
-}
-
-// End JoinRelType.java
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
index cef2de0..d260514 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
@@ -79,7 +79,6 @@ import org.apache.calcite.rex.RexShuttle;
 import org.apache.calcite.rex.RexSubQuery;
 import org.apache.calcite.rex.RexUtil;
 import org.apache.calcite.rex.RexVisitorImpl;
-import org.apache.calcite.sql.SemiJoinType;
 import org.apache.calcite.sql.SqlExplainFormat;
 import org.apache.calcite.sql.SqlExplainLevel;
 import org.apache.calcite.sql.SqlFunction;
@@ -119,7 +118,7 @@ import java.util.TreeMap;
 
 /**
  *  This class is copied from Apache Calcite except that it supports SEMI/ANTI join.
- *  NOTES: This file should be deleted when upgrading to a new calcite version which contains CALCITE-2969.
+ *  NOTES: This file should be deleted when CALCITE-3169 and CALCITE-3170 are fixed.
  */
 
 /**
@@ -263,6 +262,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
       // has been rewritten; apply rules post-decorrelation
       final HepProgram program2 = HepProgram.builder()
           .addRuleInstance(
+              // use FilterJoinRule instead of FlinkFilterJoinRule while CALCITE-3170 is fixed
               new FlinkFilterJoinRule.FlinkFilterIntoJoinRule(
                   true, f,
                   FlinkFilterJoinRule.TRUE_PREDICATE))
@@ -474,8 +474,11 @@ public class RelDecorrelator implements ReflectiveVisitor {
     }
     final RelNode newInput = frame.r;
 
+    // aggregate outputs mapping: group keys and aggregates
+    final Map<Integer, Integer> outputMap = new HashMap<>();
+
     // map from newInput
-    Map<Integer, Integer> mapNewInputToProjOutputs = new HashMap<>();
+    final Map<Integer, Integer> mapNewInputToProjOutputs = new HashMap<>();
     final int oldGroupKeyCount = rel.getGroupSet().cardinality();
 
     // Project projects the original expressions,
@@ -497,6 +500,9 @@ public class RelDecorrelator implements ReflectiveVisitor {
         omittedConstants.put(i, constant);
         continue;
       }
+
+      // add mapping of group keys.
+      outputMap.put(i, newPos);
       int newInputPos = frame.oldToNewOutputs.get(i);
       projects.add(RexInputRef.of2(newInputPos, newInputOutput));
       mapNewInputToProjOutputs.put(newInputPos, newPos);
@@ -600,7 +606,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
 
       // The old to new output position mapping will be the same as that
       // of newProject, plus any aggregates that the oldAgg produces.
-      combinedMap.put(
+      outputMap.put(
           oldInputOutputFieldCount + i,
           newInputOutputFieldCount + i);
     }
@@ -612,15 +618,37 @@ public class RelDecorrelator implements ReflectiveVisitor {
       final List<RexNode> postProjects = new ArrayList<>(relBuilder.fields());
       for (Map.Entry<Integer, RexLiteral> entry
           : omittedConstants.descendingMap().entrySet()) {
-        postProjects.add(entry.getKey() + frame.corDefOutputs.size(),
-            entry.getValue());
+        int index = entry.getKey() + frame.corDefOutputs.size();
+        postProjects.add(index, entry.getValue());
+        // Shift the outputs whose index equals with or bigger than the added index
+        // with 1 offset.
+        shiftMapping(outputMap, index, 1);
+        // Then add the constant key mapping.
+        outputMap.put(entry.getKey(), index);
       }
       relBuilder.project(postProjects);
     }
 
     // Aggregate does not change input ordering so corVars will be
     // located at the same position as the input newProject.
-    return register(rel, relBuilder.build(), combinedMap, corDefOutputs);
+    return register(rel, relBuilder.build(), outputMap, corDefOutputs);
+  }
+
+  /**
+   * Shift the mapping to fixed offset from the {@code startIndex}.
+   * @param mapping    the original mapping
+   * @param startIndex any output whose index equals with or bigger than the starting index
+   *                   would be shift
+   * @param offset     shift offset
+   */
+  private static void shiftMapping(Map<Integer, Integer> mapping, int startIndex, int offset) {
+    for (Map.Entry<Integer, Integer> entry : mapping.entrySet()) {
+      if (entry.getValue() >= startIndex) {
+        mapping.put(entry.getKey(), entry.getValue() + offset);
+      } else {
+        mapping.put(entry.getKey(), entry.getValue());
+      }
+    }
   }
 
   public Frame getInvoke(RelNode r, RelNode parent) {
@@ -1162,7 +1190,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
         RexUtil.composeConjunction(relBuilder.getRexBuilder(), conditions);
     RelNode newJoin =
         LogicalJoin.create(leftFrame.r, rightFrame.r, condition,
-            ImmutableSet.of(), toJoinRelType(rel.getJoinType()));
+            ImmutableSet.of(), rel.getJoinType());
 
     return register(rel, newJoin, mapOldToNewOutputs, corDefOutputs);
   }
@@ -1173,6 +1201,14 @@ public class RelDecorrelator implements ReflectiveVisitor {
    * @param rel Join
    */
   public Frame decorrelateRel(LogicalJoin rel) {
+    // For SEMI/ANTI join decorrelate it's input directly,
+    // because the correlate variables can only be propagated from
+    // the left side, which is not supported yet.
+    if (!rel.getJoinType().projectsRight()) {
+      // fix CALCITE-3169
+      return decorrelateRel((RelNode) rel);
+    }
+
     //
     // Rewrite logic:
     //
@@ -1180,10 +1216,6 @@ public class RelDecorrelator implements ReflectiveVisitor {
     // 2. map output positions and produce corVars if any.
     //
 
-    if (!rel.getJoinType().projectsRight()) {
-      return decorrelateRel((RelNode) rel);
-    }
-
     final RelNode oldLeft = rel.getInput(0);
     final RelNode oldRight = rel.getInput(1);
 
@@ -1335,21 +1367,6 @@ public class RelDecorrelator implements ReflectiveVisitor {
         .build();
   }
 
-  private JoinRelType toJoinRelType(SemiJoinType semiJoinType) {
-    switch (semiJoinType) {
-      case INNER:
-        return JoinRelType.INNER;
-      case LEFT:
-        return JoinRelType.LEFT;
-      case SEMI:
-        return JoinRelType.SEMI;
-      case ANTI:
-        return JoinRelType.ANTI;
-      default:
-        throw new IllegalArgumentException("Unsupported type: " + semiJoinType);
-    }
-  }
-
   /**
    * Pulls a {@link Project} above a {@link Correlate} from its RHS input.
    * Enforces nullability for join output.
@@ -1365,7 +1382,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
       LogicalProject project,
       Set<Integer> isCount) {
     final RelNode left = correlate.getLeft();
-    final JoinRelType joinType = toJoinRelType(correlate.getJoinType());
+    final JoinRelType joinType = correlate.getJoinType();
 
     // now create the new project
     final List<Pair<RexNode, String>> newProjects = new ArrayList<>();
@@ -1867,7 +1884,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
       //   Aggregate (groupby (0) single_value())
       //     Project-A (may reference corVar)
       //       rightInput
-      final JoinRelType joinType = toJoinRelType(correlate.getJoinType());
+      final JoinRelType joinType = correlate.getJoinType();
 
       // corRel.getCondition was here, however Correlate was updated so it
       // never includes a join condition. The code was not modified for brevity.
@@ -2077,7 +2094,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
         return;
       }
 
-      final JoinRelType joinType = toJoinRelType(correlate.getJoinType());
+      final JoinRelType joinType = correlate.getJoinType();
       // corRel.getCondition was here, however Correlate was updated so it
       // never includes a join condition. The code was not modified for brevity.
       RexNode joinCond = rexBuilder.makeLiteral(true);
@@ -2482,7 +2499,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
         return;
       }
 
-      JoinRelType joinType = toJoinRelType(correlate.getJoinType());
+      JoinRelType joinType = correlate.getJoinType();
       // corRel.getCondition was here, however Correlate was updated so it
       // never includes a join condition. The code was not modified for brevity.
       RexNode joinCond = relBuilder.literal(true);
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/FunctionCatalogOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/FunctionCatalogOperatorTable.java
index bd8306f..dc5b918 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/FunctionCatalogOperatorTable.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/FunctionCatalogOperatorTable.java
@@ -33,6 +33,7 @@ import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.SqlOperatorTable;
 import org.apache.calcite.sql.SqlSyntax;
+import org.apache.calcite.sql.validate.SqlNameMatcher;
 
 import java.util.List;
 import java.util.Optional;
@@ -58,7 +59,8 @@ public class FunctionCatalogOperatorTable implements SqlOperatorTable {
 			SqlIdentifier opName,
 			SqlFunctionCategory category,
 			SqlSyntax syntax,
-			List<SqlOperator> operatorList) {
+			List<SqlOperator> operatorList,
+			SqlNameMatcher nameMatcher) {
 		if (!opName.isSimple()) {
 			return;
 		}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/FlinkSqlOperatorTable.java
index 12af928..d3d2e5a 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/FlinkSqlOperatorTable.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/FlinkSqlOperatorTable.java
@@ -29,9 +29,11 @@ import org.apache.calcite.sql.SqlBinaryOperator;
 import org.apache.calcite.sql.SqlFunction;
 import org.apache.calcite.sql.SqlFunctionCategory;
 import org.apache.calcite.sql.SqlGroupedWindowFunction;
+import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.SqlOperatorBinding;
+import org.apache.calcite.sql.SqlSyntax;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql.type.InferTypes;
 import org.apache.calcite.sql.type.OperandTypes;
@@ -42,6 +44,8 @@ import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.sql.type.SqlTypeTransforms;
 import org.apache.calcite.sql.util.ReflectiveSqlOperatorTable;
 import org.apache.calcite.sql.validate.SqlMonotonicity;
+import org.apache.calcite.sql.validate.SqlNameMatcher;
+import org.apache.calcite.sql.validate.SqlNameMatchers;
 
 import java.util.Arrays;
 import java.util.List;
@@ -89,6 +93,17 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 		});
 	}
 
+	@Override
+	public void lookupOperatorOverloads(
+			SqlIdentifier opName,
+			SqlFunctionCategory category,
+			SqlSyntax syntax,
+			List<SqlOperator> operatorList,
+			SqlNameMatcher nameMatcher) {
+		// set caseSensitive=false to make sure the behavior is same with before.
+		super.lookupOperatorOverloads(opName, category, syntax, operatorList, SqlNameMatchers.withCaseSensitive(false));
+	}
+
 	// -----------------------------------------------------------------------------
 	// Flink specific built-in scalar SQL functions
 	// -----------------------------------------------------------------------------
@@ -1131,7 +1146,6 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 	public static final SqlFunction CURRENT_TIMESTAMP = SqlStdOperatorTable.CURRENT_TIMESTAMP;
 	public static final SqlFunction CURRENT_DATE = SqlStdOperatorTable.CURRENT_DATE;
 	public static final SqlFunction CAST = SqlStdOperatorTable.CAST;
-	public static final SqlFunction QUARTER = SqlStdOperatorTable.QUARTER;
 	public static final SqlOperator SCALAR_QUERY = SqlStdOperatorTable.SCALAR_QUERY;
 	public static final SqlOperator EXISTS = SqlStdOperatorTable.EXISTS;
 	public static final SqlFunction SIN = SqlStdOperatorTable.SIN;
@@ -1148,9 +1162,21 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 	public static final SqlFunction PI = SqlStdOperatorTable.PI;
 	public static final SqlFunction RAND = SqlStdOperatorTable.RAND;
 	public static final SqlFunction RAND_INTEGER = SqlStdOperatorTable.RAND_INTEGER;
+	public static final SqlFunction TRUNCATE = SqlStdOperatorTable.TRUNCATE;
+
+	// TIME FUNCTIONS
+	public static final SqlFunction YEAR = SqlStdOperatorTable.YEAR;
+	public static final SqlFunction QUARTER = SqlStdOperatorTable.QUARTER;
+	public static final SqlFunction MONTH = SqlStdOperatorTable.MONTH;
+	public static final SqlFunction WEEK = SqlStdOperatorTable.WEEK;
+	public static final SqlFunction HOUR = SqlStdOperatorTable.HOUR;
+	public static final SqlFunction MINUTE = SqlStdOperatorTable.MINUTE;
+	public static final SqlFunction SECOND = SqlStdOperatorTable.SECOND;
+	public static final SqlFunction DAYOFYEAR = SqlStdOperatorTable.DAYOFYEAR;
+	public static final SqlFunction DAYOFMONTH = SqlStdOperatorTable.DAYOFMONTH;
+	public static final SqlFunction DAYOFWEEK = SqlStdOperatorTable.DAYOFWEEK;
 	public static final SqlFunction TIMESTAMP_ADD = SqlStdOperatorTable.TIMESTAMP_ADD;
 	public static final SqlFunction TIMESTAMP_DIFF = SqlStdOperatorTable.TIMESTAMP_DIFF;
-	public static final SqlFunction TRUNCATE = SqlStdOperatorTable.TRUNCATE;
 
 	// MATCH_RECOGNIZE
 	public static final SqlFunction FIRST = SqlStdOperatorTable.FIRST;
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/FlinkFilterJoinRule.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/FlinkFilterJoinRule.java
index 3b593de..3513ba7 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/FlinkFilterJoinRule.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/FlinkFilterJoinRule.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.table.plan.rules.logical;
 
-import org.apache.flink.table.plan.util.FlinkRelOptUtil;
-
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelOptRuleOperand;
@@ -42,12 +40,10 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
 
-import static org.apache.calcite.plan.RelOptUtil.conjunctions;
-
 /**
  * This rules is copied from Calcite's {@link org.apache.calcite.rel.rules.FilterJoinRule}.
+ * NOTES: This file should be deleted when CALCITE-3170 is fixed.
  * Modification:
- * - Use `FlinkRelOptUtil.classifyFilters` to support SEMI/ANTI join
  * - Handles the ON condition of anti-join can not be pushed down
  */
 
@@ -140,7 +136,7 @@ public abstract class FlinkFilterJoinRule extends RelOptRule {
 
 		final List<RexNode> aboveFilters =
 				filter != null
-						? conjunctions(filter.getCondition())
+						? RelOptUtil.conjunctions(filter.getCondition())
 						: new ArrayList<>();
 		final com.google.common.collect.ImmutableList<RexNode> origAboveFilters =
 				com.google.common.collect.ImmutableList.copyOf(aboveFilters);
@@ -150,7 +146,7 @@ public abstract class FlinkFilterJoinRule extends RelOptRule {
 		if (smart
 				&& !origAboveFilters.isEmpty()
 				&& join.getJoinType() != JoinRelType.INNER) {
-			joinType = FlinkRelOptUtil.simplifyJoin(join, origAboveFilters, joinType);
+			joinType = RelOptUtil.simplifyJoin(join, origAboveFilters, joinType);
 		}
 
 		final List<RexNode> leftFilters = new ArrayList<>();
@@ -166,7 +162,7 @@ public abstract class FlinkFilterJoinRule extends RelOptRule {
 		// filters. They can be pushed down if they are not on the NULL
 		// generating side.
 		boolean filterPushed = false;
-		if (FlinkRelOptUtil.classifyFilters(
+		if (RelOptUtil.classifyFilters(
 				join,
 				aboveFilters,
 				joinType,
@@ -198,7 +194,7 @@ public abstract class FlinkFilterJoinRule extends RelOptRule {
 		// pushed down if it does not affect the non-matching set, i.e. it is
 		// not on the side which is preserved.
 		// A ON clause filter of anti-join can not be pushed down.
-		if (!isAntiJoin && FlinkRelOptUtil.classifyFilters(
+		if (!isAntiJoin && RelOptUtil.classifyFilters(
 				join,
 				joinFilters,
 				joinType,
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/SubQueryDecorrelator.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/SubQueryDecorrelator.java
index a404e3f..2847a41 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/SubQueryDecorrelator.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/SubQueryDecorrelator.java
@@ -29,7 +29,6 @@ import org.apache.calcite.plan.hep.HepRelVertex;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelShuttleImpl;
-import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.core.Project;
@@ -563,10 +562,6 @@ public class SubQueryDecorrelator extends RelShuttleImpl {
 		 * @param rel Aggregate to rewrite
 		 */
 		public Frame decorrelateRel(LogicalAggregate rel) {
-			if (rel.getGroupType() != Aggregate.Group.SIMPLE) {
-				throw new AssertionError(Bug.CALCITE_461_FIXED);
-			}
-
 			// Aggregate itself should not reference corVars.
 			assert !cm.mapRefRelToCorRef.containsKey(rel);
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkLogicalRelFactories.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkLogicalRelFactories.scala
index c60bb12..11b0681 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkLogicalRelFactories.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkLogicalRelFactories.scala
@@ -32,8 +32,8 @@ import org.apache.calcite.rel.core._
 import org.apache.calcite.rel.logical._
 import org.apache.calcite.rel.{RelCollation, RelNode}
 import org.apache.calcite.rex._
+import org.apache.calcite.sql.SqlKind
 import org.apache.calcite.sql.SqlKind.{EXCEPT, INTERSECT, UNION}
-import org.apache.calcite.sql.{SemiJoinType, SqlKind}
 import org.apache.calcite.tools.{RelBuilder, RelBuilderFactory}
 import org.apache.calcite.util.ImmutableBitSet
 
@@ -139,14 +139,12 @@ object FlinkLogicalRelFactories {
     * Implementation of [[AggregateFactory]] that returns a [[FlinkLogicalAggregate]].
     */
   class AggregateFactoryImpl extends AggregateFactory {
-    @SuppressWarnings(Array("deprecation"))
     def createAggregate(
         input: RelNode,
-        indicator: Boolean,
         groupSet: ImmutableBitSet,
         groupSets: ImmutableList[ImmutableBitSet],
         aggCalls: util.List[AggregateCall]): RelNode = {
-      FlinkLogicalAggregate.create(input, indicator, groupSet, groupSets, aggCalls)
+      FlinkLogicalAggregate.create(input, groupSet, groupSets, aggCalls)
     }
   }
 
@@ -206,7 +204,7 @@ object FlinkLogicalRelFactories {
         right: RelNode,
         correlationId: CorrelationId,
         requiredColumns: ImmutableBitSet,
-        joinType: SemiJoinType): RelNode = {
+        joinType: JoinRelType): RelNode = {
       FlinkLogicalCorrelate.create(left, right, correlationId, requiredColumns, joinType)
     }
   }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CorrelateCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CorrelateCodeGenerator.scala
index 7091846..89f0a4c 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CorrelateCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CorrelateCodeGenerator.scala
@@ -42,8 +42,8 @@ import org.apache.flink.table.types.{DataType, PlannerTypeUtils}
 import org.apache.flink.table.typeutils.BaseRowTypeInfo
 
 import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
 import org.apache.calcite.rex._
-import org.apache.calcite.sql.SemiJoinType
 
 import scala.collection.JavaConversions._
 
@@ -58,7 +58,7 @@ object CorrelateCodeGenerator {
       scan: FlinkLogicalTableFunctionScan,
       condition: Option[RexNode],
       outDataType: RelDataType,
-      joinType: SemiJoinType,
+      joinType: JoinRelType,
       parallelism: Int,
       retainHeader: Boolean,
       expression: (RexNode, List[String], Option[List[RexNode]]) => String,
@@ -153,7 +153,7 @@ object CorrelateCodeGenerator {
       swallowInputOnly: Boolean = false,
       udtfType: LogicalType,
       returnType: RowType,
-      joinType: SemiJoinType,
+      joinType: JoinRelType,
       rexCall: RexCall,
       pojoFieldMapping: Option[Array[Int]],
       ruleDescription: String,
@@ -192,7 +192,7 @@ object CorrelateCodeGenerator {
          |""".stripMargin
 
     // 3. left join
-    if (joinType == SemiJoinType.LEFT) {
+    if (joinType == JoinRelType.LEFT) {
       if (swallowInputOnly) {
         // and the returned row table function is empty, collect a null
         val nullRowTerm = CodeGenUtils.newName("nullRow")
@@ -267,8 +267,8 @@ object CorrelateCodeGenerator {
              |""".stripMargin
 
         }
-    } else if (joinType != SemiJoinType.INNER) {
-      throw new TableException(s"Unsupported SemiJoinType: $joinType for correlate join.")
+    } else if (joinType != JoinRelType.INNER) {
+      throw new TableException(s"Unsupported JoinRelType: $joinType for correlate join.")
     }
 
     val genOperator = OperatorCodeGenerator.generateOneInputStreamOperator[BaseRow, BaseRow](
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnUniqueness.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnUniqueness.scala
index 7993d67..0e5dcd6 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnUniqueness.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnUniqueness.scala
@@ -40,7 +40,6 @@ import org.apache.calcite.rel.core._
 import org.apache.calcite.rel.metadata._
 import org.apache.calcite.rel.{RelNode, SingleRel}
 import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
-import org.apache.calcite.sql.SemiJoinType._
 import org.apache.calcite.sql.SqlKind
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.util.{Bug, BuiltInMethod, ImmutableBitSet, Util}
@@ -583,8 +582,9 @@ class FlinkRelMdColumnUniqueness private extends MetadataHandler[BuiltInMetadata
       columns: ImmutableBitSet,
       ignoreNulls: Boolean): JBoolean = {
     rel.getJoinType match {
-      case ANTI | SEMI => mq.areColumnsUnique(rel.getLeft, columns, ignoreNulls)
-      case LEFT | INNER =>
+      case JoinRelType.ANTI | JoinRelType.SEMI =>
+        mq.areColumnsUnique(rel.getLeft, columns, ignoreNulls)
+      case JoinRelType.LEFT | JoinRelType.INNER =>
         val left = rel.getLeft
         val right = rel.getRight
         val leftFieldCount = left.getRowType.getFieldCount
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalWindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalWindowAggregate.scala
index be8288e..b96faf4 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalWindowAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalWindowAggregate.scala
@@ -42,7 +42,6 @@ final class LogicalWindowAggregate(
   override def copy(
       traitSet: RelTraitSet,
       input: RelNode,
-      indicator: Boolean,
       groupSet: ImmutableBitSet,
       groupSets: util.List[ImmutableBitSet],
       aggCalls: util.List[AggregateCall]): Aggregate = {
@@ -74,7 +73,7 @@ object LogicalWindowAggregate {
       window: LogicalWindow,
       namedProperties: Seq[PlannerNamedWindowProperty],
       agg: Aggregate): LogicalWindowAggregate = {
-    require(!agg.indicator && (agg.getGroupType == Group.SIMPLE))
+    require(agg.getGroupType == Group.SIMPLE)
     val cluster: RelOptCluster = agg.getCluster
     val traitSet: RelTraitSet = cluster.traitSetOf(Convention.NONE)
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/WindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/WindowAggregate.scala
index e206b65..433590b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/WindowAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/WindowAggregate.scala
@@ -48,7 +48,6 @@ abstract class WindowAggregate(
     cluster,
     traitSet,
     child,
-    false,
     groupSet,
     ImmutableList.of(groupSet),
     aggCalls) {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/common/CommonPhysicalJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/common/CommonPhysicalJoin.scala
index 65fec34..4a0c303 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/common/CommonPhysicalJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/common/CommonPhysicalJoin.scala
@@ -64,11 +64,10 @@ abstract class CommonPhysicalJoin(
   lazy val inputRowType: RelDataType = joinType match {
     case JoinRelType.SEMI | JoinRelType.ANTI =>
       // Combines inputs' RowType, the result is different from SEMI/ANTI Join's RowType.
-      SqlValidatorUtil.deriveJoinRowType(
+      SqlValidatorUtil.createJoinType(
+        getCluster.getTypeFactory,
         getLeft.getRowType,
         getRight.getRowType,
-        getJoinType,
-        getCluster.getTypeFactory,
         null,
         Collections.emptyList[RelDataTypeField]
       )
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalAggregate.scala
index 70fc662..52d3c41 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalAggregate.scala
@@ -43,13 +43,12 @@ class FlinkLogicalAggregate(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     child: RelNode,
-    indicator: Boolean,
     groupSet: ImmutableBitSet,
     groupSets: util.List[ImmutableBitSet],
     aggCalls: util.List[AggregateCall],
     /* flag indicating whether to skip SplitAggregateRule */
     var partialFinalType: PartialFinalType = PartialFinalType.NONE)
-  extends Aggregate(cluster, traitSet, child, indicator, groupSet, groupSets, aggCalls)
+  extends Aggregate(cluster, traitSet, child, groupSet, groupSets, aggCalls)
   with FlinkLogicalRel {
 
   def setPartialFinalType(partialFinalType: PartialFinalType): Unit = {
@@ -59,12 +58,11 @@ class FlinkLogicalAggregate(
   override def copy(
       traitSet: RelTraitSet,
       input: RelNode,
-      indicator: Boolean,
       groupSet: ImmutableBitSet,
       groupSets: util.List[ImmutableBitSet],
       aggCalls: util.List[AggregateCall]): Aggregate = {
     new FlinkLogicalAggregate(
-      cluster, traitSet, input, indicator, groupSet, groupSets, aggCalls, partialFinalType)
+      cluster, traitSet, input, groupSet, groupSets, aggCalls, partialFinalType)
   }
 
   override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = {
@@ -113,7 +111,6 @@ private class FlinkLogicalAggregateBatchConverter
     val newInput = RelOptRule.convert(agg.getInput, FlinkConventions.LOGICAL)
     FlinkLogicalAggregate.create(
       newInput,
-      agg.indicator,
       agg.getGroupSet,
       agg.getGroupSets,
       agg.getAggCallList)
@@ -143,7 +140,6 @@ private class FlinkLogicalAggregateStreamConverter
     val newInput = RelOptRule.convert(agg.getInput, FlinkConventions.LOGICAL)
     FlinkLogicalAggregate.create(
       newInput,
-      agg.indicator,
       agg.getGroupSet,
       agg.getGroupSets,
       agg.getAggCallList)
@@ -156,12 +152,11 @@ object FlinkLogicalAggregate {
 
   def create(
       input: RelNode,
-      indicator: Boolean,
       groupSet: ImmutableBitSet,
       groupSets: util.List[ImmutableBitSet],
       aggCalls: util.List[AggregateCall]): FlinkLogicalAggregate = {
     val cluster = input.getCluster
     val traitSet = cluster.traitSetOf(FlinkConventions.LOGICAL).simplify()
-    new FlinkLogicalAggregate(cluster,traitSet, input, indicator, groupSet, groupSets, aggCalls)
+    new FlinkLogicalAggregate(cluster,traitSet, input, groupSet, groupSets, aggCalls)
   }
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCorrelate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCorrelate.scala
index 54bf96c..3c06542 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCorrelate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCorrelate.scala
@@ -23,9 +23,8 @@ import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.calcite.plan.{Convention, RelOptCluster, RelOptRule, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.core.{Correlate, CorrelationId}
+import org.apache.calcite.rel.core.{Correlate, CorrelationId, JoinRelType}
 import org.apache.calcite.rel.logical.LogicalCorrelate
-import org.apache.calcite.sql.SemiJoinType
 import org.apache.calcite.util.ImmutableBitSet
 
 /**
@@ -39,7 +38,7 @@ class FlinkLogicalCorrelate(
     right: RelNode,
     correlationId: CorrelationId,
     requiredColumns: ImmutableBitSet,
-    joinType: SemiJoinType)
+    joinType: JoinRelType)
   extends Correlate(cluster, traitSet, left, right, correlationId, requiredColumns, joinType)
   with FlinkLogicalRel {
 
@@ -49,7 +48,7 @@ class FlinkLogicalCorrelate(
       right: RelNode,
       correlationId: CorrelationId,
       requiredColumns: ImmutableBitSet,
-      joinType: SemiJoinType): Correlate = {
+      joinType: JoinRelType): Correlate = {
 
     new FlinkLogicalCorrelate(
       cluster,
@@ -91,7 +90,7 @@ object FlinkLogicalCorrelate {
       right: RelNode,
       correlationId: CorrelationId,
       requiredColumns: ImmutableBitSet,
-      joinType: SemiJoinType): FlinkLogicalCorrelate = {
+      joinType: JoinRelType): FlinkLogicalCorrelate = {
     val cluster = left.getCluster
     val traitSet = cluster.traitSetOf(FlinkConventions.LOGICAL).simplify()
     new FlinkLogicalCorrelate(
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
index 2b96e44..41074bf 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
@@ -18,20 +18,20 @@
 
 package org.apache.flink.table.plan.nodes.logical
 
+import org.apache.flink.table.functions.TemporalTableFunction
+import org.apache.flink.table.functions.utils.TableSqlFunction
 import org.apache.flink.table.plan.nodes.FlinkConventions
+
 import com.google.common.collect.ImmutableList
 import org.apache.calcite.plan.{Convention, RelOptCluster, RelOptRuleCall, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.core.TableFunctionScan
+import org.apache.calcite.rel.core.{JoinRelType, TableFunctionScan}
 import org.apache.calcite.rel.logical.LogicalTableFunctionScan
 import org.apache.calcite.rel.metadata.RelColumnMapping
 import org.apache.calcite.rex.{RexCall, RexLiteral, RexNode, RexUtil}
-import org.apache.calcite.sql.SemiJoinType
 import org.apache.calcite.util.ImmutableBitSet
-import org.apache.flink.table.functions.TemporalTableFunction
-import org.apache.flink.table.functions.utils.TableSqlFunction
 
 import java.lang.reflect.Type
 import java.util
@@ -162,7 +162,7 @@ class FlinkLogicalTableFunctionScanConverter
       newScan,
       cluster.createCorrel(), // a dummy CorrelationId
       ImmutableBitSet.of(),
-      SemiJoinType.INNER)
+      JoinRelType.INNER)
   }
 }
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
index ebea49c..383cdd1 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
@@ -50,7 +50,6 @@ class FlinkLogicalWindowAggregate(
   override def copy(
       traitSet: RelTraitSet,
       input: RelNode,
-      indicator: Boolean,
       groupSet: ImmutableBitSet,
       groupSets: util.List[ImmutableBitSet],
       aggCalls: util.List[AggregateCall]): Aggregate = {
@@ -99,7 +98,7 @@ class FlinkLogicalWindowAggregateConverter
 
   override def convert(rel: RelNode): RelNode = {
     val agg = rel.asInstanceOf[LogicalWindowAggregate]
-    require(!agg.indicator && (agg.getGroupType == Group.SIMPLE))
+    require(agg.getGroupType == Group.SIMPLE)
     val newInput = RelOptRule.convert(agg.getInput, FlinkConventions.LOGICAL)
     val traitSet = newInput.getCluster.traitSet().replace(FlinkConventions.LOGICAL).simplify()
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecCorrelate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecCorrelate.scala
index cd792fa..8e3c759 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecCorrelate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecCorrelate.scala
@@ -30,10 +30,10 @@ import org.apache.flink.table.planner.BatchPlanner
 
 import org.apache.calcite.plan.{RelOptCluster, RelOptRule, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.Correlate
+import org.apache.calcite.rel.core.{Correlate, JoinRelType}
 import org.apache.calcite.rel.{RelCollationTraitDef, RelDistribution, RelFieldCollation, RelNode, RelWriter, SingleRel}
 import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode, RexProgram}
-import org.apache.calcite.sql.{SemiJoinType, SqlKind}
+import org.apache.calcite.sql.SqlKind
 import org.apache.calcite.util.mapping.{Mapping, MappingType, Mappings}
 
 import java.util
@@ -51,12 +51,12 @@ class BatchExecCorrelate(
     condition: Option[RexNode],
     projectProgram: Option[RexProgram],
     outputRowType: RelDataType,
-    joinType: SemiJoinType)
+    joinType: JoinRelType)
   extends SingleRel(cluster, traitSet, inputRel)
   with BatchPhysicalRel
   with BatchExecNode[BaseRow] {
 
-  require(joinType == SemiJoinType.INNER || joinType == SemiJoinType.LEFT)
+  require(joinType == JoinRelType.INNER || joinType == JoinRelType.LEFT)
 
   override def deriveRowType(): RelDataType = outputRowType
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecCorrelate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecCorrelate.scala
index 9b28572..d3324ac 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecCorrelate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecCorrelate.scala
@@ -29,9 +29,9 @@ import org.apache.flink.table.runtime.AbstractProcessStreamOperator
 
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
 import org.apache.calcite.rex.{RexCall, RexNode, RexProgram}
-import org.apache.calcite.sql.SemiJoinType
 
 import java.util
 
@@ -48,12 +48,12 @@ class StreamExecCorrelate(
     scan: FlinkLogicalTableFunctionScan,
     condition: Option[RexNode],
     outputRowType: RelDataType,
-    joinType: SemiJoinType)
+    joinType: JoinRelType)
   extends SingleRel(cluster, traitSet, inputRel)
   with StreamPhysicalRel
   with StreamExecNode[BaseRow] {
 
-  require(joinType == SemiJoinType.INNER || joinType == SemiJoinType.LEFT)
+  require(joinType == JoinRelType.INNER || joinType == JoinRelType.LEFT)
 
   override def producesUpdates: Boolean = false
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala
index 96da7b4..edc15e3 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala
@@ -74,7 +74,6 @@ object FlinkBatchProgram {
     )
 
     // rewrite special temporal join plan
-    // TODO remove this program after upgraded to CALCITE-1.20.0 (CALCITE-2004 is fixed)
     chainedProgram.addLast(
       TEMPORAL_JOIN_REWRITE,
       FlinkGroupProgramBuilder.newBuilder[BatchOptimizeContext]
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkStreamProgram.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkStreamProgram.scala
index 1020a69..7fac4cd 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkStreamProgram.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkStreamProgram.scala
@@ -74,7 +74,6 @@ object FlinkStreamProgram {
         .build())
 
     // rewrite special temporal join plan
-    // TODO remove this program after upgraded to CALCITE-1.20.0 (CALCITE-2004 is fixed)
     chainedProgram.addLast(
       TEMPORAL_JOIN_REWRITE,
       FlinkGroupProgramBuilder.newBuilder[StreamOptimizeContext]
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
index 27fc011..38f677e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
@@ -17,10 +17,11 @@
  */
 package org.apache.flink.table.plan.rules.logical
 
-import org.apache.calcite.plan.RelOptRule.{any, operand, some}
+import org.apache.calcite.plan.RelOptRule.{any, operand}
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.logical.{LogicalCorrelate, LogicalFilter, LogicalSnapshot}
+import org.apache.calcite.rex.{RexCorrelVariable, RexFieldAccess, RexInputRef, RexNode, RexShuttle}
 
 /**
   * The initial temporal table join (FOR SYSTEM_TIME AS OF) is a Correlate, rewrite it into a Join
@@ -28,29 +29,51 @@ import org.apache.calcite.rel.logical.{LogicalCorrelate, LogicalFilter, LogicalS
   * [[org.apache.flink.table.plan.nodes.physical.stream.StreamExecLookupJoin]] in physical and
   * might be translated into
   * [[org.apache.flink.table.plan.nodes.physical.stream.StreamExecTemporalJoin]] in the future.
+  *
+  * TODO supports `true` join condition
   */
 class LogicalCorrelateToJoinFromTemporalTableRule
   extends RelOptRule(
-    operand(classOf[LogicalFilter],
-      operand(classOf[LogicalCorrelate], some(
-        operand(classOf[RelNode], any()),
-        operand(classOf[LogicalSnapshot], any())))),
+    operand(classOf[LogicalCorrelate],
+      operand(classOf[RelNode], any()),
+      operand(classOf[LogicalFilter],
+        operand(classOf[LogicalSnapshot], any()))),
     "LogicalCorrelateToJoinFromTemporalTableRule") {
 
   override def onMatch(call: RelOptRuleCall): Unit = {
-    val filterOnCorrelate: LogicalFilter = call.rel(0)
-    val correlate: LogicalCorrelate = call.rel(1)
-    val leftNode: RelNode = call.rel(2)
+    val correlate: LogicalCorrelate = call.rel(0)
+    val leftInput: RelNode = call.rel(1)
+    val filter: LogicalFilter = call.rel(2)
     val snapshot: LogicalSnapshot = call.rel[LogicalSnapshot](3)
 
+    val leftRowType = leftInput.getRowType
+    val condition = filter.getCondition
+    val joinCondition = condition.accept(new RexShuttle() {
+      // change correlate variable expression to normal RexInputRef (which is from left side)
+      override def visitFieldAccess(fieldAccess: RexFieldAccess): RexNode = {
+        fieldAccess.getReferenceExpr match {
+          case corVar: RexCorrelVariable =>
+            require(correlate.getCorrelationId.equals(corVar.id))
+            val index = leftRowType.getFieldList.indexOf(fieldAccess.getField)
+            RexInputRef.of(index, leftRowType)
+          case _ => super.visitFieldAccess(fieldAccess)
+        }
+      }
+
+      // update the field index from right side
+      override def visitInputRef(inputRef: RexInputRef): RexNode = {
+        val rightIndex = leftRowType.getFieldCount + inputRef.getIndex
+        new RexInputRef(rightIndex, inputRef.getType)
+      }
+    })
+
     val builder = call.builder()
-    builder.push(leftNode)
+    builder.push(leftInput)
     builder.push(snapshot)
-    builder.join(
-      correlate.getJoinType.toJoinType,
-      filterOnCorrelate.getCondition)
+    builder.join(correlate.getJoinType, joinCondition)
 
-    call.transformTo(builder.build())
+    val rel = builder.build()
+    call.transformTo(rel)
   }
 
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecCorrelateRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecCorrelateRule.scala
index 5b6456b..cf748f1 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecCorrelateRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecCorrelateRule.scala
@@ -36,8 +36,8 @@ class StreamExecCorrelateRule
     "StreamExecCorrelateRule") {
 
   override def matches(call: RelOptRuleCall): Boolean = {
-    val join: FlinkLogicalCorrelate = call.rel(0).asInstanceOf[FlinkLogicalCorrelate]
-    val right = join.getRight.asInstanceOf[RelSubset].getOriginal
+    val correlate: FlinkLogicalCorrelate = call.rel(0)
+    val right = correlate.getRight.asInstanceOf[RelSubset].getOriginal
 
     // find only calc and table function
     def findTableFunction(calc: FlinkLogicalCalc): Boolean = {
@@ -59,10 +59,11 @@ class StreamExecCorrelateRule
   }
 
   override def convert(rel: RelNode): RelNode = {
-    val join: FlinkLogicalCorrelate = rel.asInstanceOf[FlinkLogicalCorrelate]
+    val correlate = rel.asInstanceOf[FlinkLogicalCorrelate]
     val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
-    val convInput: RelNode = RelOptRule.convert(join.getInput(0), FlinkConventions.STREAM_PHYSICAL)
-    val right: RelNode = join.getInput(1)
+    val convInput: RelNode = RelOptRule.convert(
+      correlate.getInput(0), FlinkConventions.STREAM_PHYSICAL)
+    val right: RelNode = correlate.getInput(1)
 
 
     def getTableScan(calc: FlinkLogicalCalc): RelNode = {
@@ -115,7 +116,7 @@ class StreamExecCorrelateRule
             scan,
             condition,
             rel.getRowType,
-            join.getJoinType)
+            correlate.getJoinType)
       }
     }
     convertToCorrelate(right, None)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelOptUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelOptUtil.scala
index 1ac0f30..de71cfc 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelOptUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelOptUtil.scala
@@ -23,10 +23,9 @@ import org.apache.flink.table.plan.`trait`.{MiniBatchInterval, MiniBatchMode}
 import org.apache.flink.table.plan.metadata.SelectivityEstimator
 import org.apache.flink.table.{JBoolean, JByte, JDouble, JFloat, JLong, JShort}
 
-import com.google.common.collect.{ImmutableList, Lists}
+import com.google.common.collect.Lists
 import org.apache.calcite.config.NullCollation
-import org.apache.calcite.plan.RelOptUtil.InputFinder
-import org.apache.calcite.plan.{RelOptUtil, Strong}
+import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.rel.RelFieldCollation.{Direction, NullDirection}
 import org.apache.calcite.rel.`type`.RelDataTypeField
 import org.apache.calcite.rel.core.{Join, JoinRelType}
@@ -189,196 +188,9 @@ object FlinkRelOptUtil {
   }
 
   /**
-    * Simplifies outer joins if filter above would reject nulls.
-    *
-    * NOTES: This method should be deleted when upgrading to a new calcite version
-    * which contains CALCITE-2969.
-    *
-    * @param joinRel Join
-    * @param aboveFilters Filters from above
-    * @param joinType Join type, can not be inner join
-    */
-  def simplifyJoin(
-      joinRel: RelNode,
-      aboveFilters: ImmutableList[RexNode],
-      joinType: JoinRelType): JoinRelType = {
-    // No need to simplify if only first input output.
-    if (!joinType.projectsRight()) {
-      return joinType
-    }
-    val nTotalFields = joinRel.getRowType.getFieldCount
-    val nSysFields = 0
-    val nFieldsLeft = joinRel.getInputs.get(0).getRowType.getFieldCount
-    val nFieldsRight = joinRel.getInputs.get(1).getRowType.getFieldCount
-    assert(nTotalFields == nSysFields + nFieldsLeft + nFieldsRight)
-
-    // set the reference bitmaps for the left and right children
-    val leftBitmap = ImmutableBitSet.range(nSysFields, nSysFields + nFieldsLeft)
-    val rightBitmap = ImmutableBitSet.range(nSysFields + nFieldsLeft, nTotalFields)
-
-    var result = joinType
-    for (filter <- aboveFilters) {
-      if (joinType.generatesNullsOnLeft && Strong.isNotTrue(filter, leftBitmap)) {
-        result = result.cancelNullsOnLeft
-      }
-      if (joinType.generatesNullsOnRight && Strong.isNotTrue(filter, rightBitmap)) {
-        result = result.cancelNullsOnRight
-      }
-      if (joinType eq JoinRelType.INNER) {
-        return result
-      }
-    }
-    result
-  }
-
-  /**
-    * Classifies filters according to where they should be processed. They
-    * either stay where they are, are pushed to the join (if they originated
-    * from above the join), or are pushed to one of the children. Filters that
-    * are pushed are added to list passed in as input parameters.
-    *
-    * NOTES: This method should be deleted when upgrading to a new calcite version
-    * which contains CALCITE-2969.
-    *
-    * @param joinRel      join node
-    * @param filters      filters to be classified
-    * @param joinType     join type
-    * @param pushInto     whether filters can be pushed into the ON clause
-    * @param pushLeft     true if filters can be pushed to the left
-    * @param pushRight    true if filters can be pushed to the right
-    * @param joinFilters  list of filters to push to the join
-    * @param leftFilters  list of filters to push to the left child
-    * @param rightFilters list of filters to push to the right child
-    * @return whether at least one filter was pushed
-    */
-  def classifyFilters(
-      joinRel: RelNode,
-      filters: util.List[RexNode],
-      joinType: JoinRelType,
-      pushInto: Boolean,
-      pushLeft: Boolean,
-      pushRight: Boolean,
-      joinFilters: util.List[RexNode],
-      leftFilters: util.List[RexNode],
-      rightFilters: util.List[RexNode]): Boolean = {
-    val rexBuilder = joinRel.getCluster.getRexBuilder
-    val joinFields = joinRel.getRowType.getFieldList
-    val nTotalFields = joinFields.size
-    val nSysFields = 0 // joinRel.getSystemFieldList().size();
-    val leftFields = joinRel.getInputs.get(0).getRowType.getFieldList
-    val nFieldsLeft = leftFields.size
-    val rightFields = joinRel.getInputs.get(1).getRowType.getFieldList
-    val nFieldsRight = rightFields.size
-
-    assert(nTotalFields == (if (joinType.projectsRight()) {
-      nSysFields + nFieldsLeft + nFieldsRight
-    } else {
-      // SEMI/ANTI
-      nSysFields + nFieldsLeft
-    }))
-
-    // set the reference bitmaps for the left and right children
-    val leftBitmap = ImmutableBitSet.range(nSysFields, nSysFields + nFieldsLeft)
-    val rightBitmap = ImmutableBitSet.range(nSysFields + nFieldsLeft, nTotalFields)
-
-    val filtersToRemove = new util.ArrayList[RexNode]
-
-    filters.foreach { filter =>
-      val inputFinder = InputFinder.analyze(filter)
-      val inputBits = inputFinder.inputBitSet.build
-      // REVIEW - are there any expressions that need special handling
-      // and therefore cannot be pushed?
-      // filters can be pushed to the left child if the left child
-      // does not generate NULLs and the only columns referenced in
-      // the filter originate from the left child
-      if (pushLeft && leftBitmap.contains(inputBits)) {
-        // ignore filters that always evaluate to true
-        if (!filter.isAlwaysTrue) {
-          // adjust the field references in the filter to reflect
-          // that fields in the left now shift over by the number
-          // of system fields
-          val shiftedFilter = shiftFilter(
-            nSysFields,
-            nSysFields + nFieldsLeft,
-            -nSysFields,
-            rexBuilder,
-            joinFields,
-            nTotalFields,
-            leftFields,
-            filter)
-          leftFilters.add(shiftedFilter)
-        }
-        filtersToRemove.add(filter)
-
-        // filters can be pushed to the right child if the right child
-        // does not generate NULLs and the only columns referenced in
-        // the filter originate from the right child
-      } else if (pushRight && rightBitmap.contains(inputBits)) {
-        if (!filter.isAlwaysTrue) {
-          // that fields in the right now shift over to the left;
-          // since we never push filters to a NULL generating
-          // child, the types of the source should match the dest
-          // so we don't need to explicitly pass the destination
-          // fields to RexInputConverter
-          val shiftedFilter = shiftFilter(
-            nSysFields + nFieldsLeft,
-            nTotalFields,
-            -(nSysFields + nFieldsLeft),
-            rexBuilder,
-            joinFields,
-            nTotalFields,
-            rightFields,
-            filter)
-          rightFilters.add(shiftedFilter)
-        }
-        filtersToRemove.add(filter)
-      } else {
-        // If the filter can't be pushed to either child and the join
-        // is an inner join, push them to the join if they originated
-        // from above the join
-        if ((joinType eq JoinRelType.INNER) && pushInto) {
-          if (!joinFilters.contains(filter)) {
-            joinFilters.add(filter)
-          }
-          filtersToRemove.add(filter)
-        }
-      }
-    }
-    // Remove filters after the loop, to prevent concurrent modification.
-    if (!filtersToRemove.isEmpty) {
-      filters.removeAll(filtersToRemove)
-    }
-    // Did anything change?
-    !filtersToRemove.isEmpty
-  }
-
-  private def shiftFilter(
-      start: Int,
-      end: Int,
-      offset: Int,
-      rexBuilder: RexBuilder,
-      joinFields: util.List[RelDataTypeField],
-      nTotalFields: Int,
-      rightFields: util.List[RelDataTypeField],
-      filter: RexNode): RexNode = {
-    val adjustments = new Array[Int](nTotalFields)
-    (start until end).foreach {
-      i => adjustments(i) = offset
-    }
-    filter.accept(
-      new RelOptUtil.RexInputConverter(
-        rexBuilder,
-        joinFields,
-        rightFields,
-        adjustments)
-    )
-  }
-
-  /**
     * Pushes down expressions in "equal" join condition.
     *
-    * NOTES: This method should be deleted when upgrading to a new calcite version
-    * which contains CALCITE-2969.
+    * NOTES: This method should be deleted when CALCITE-3171 is fixed.
     *
     * <p>For example, given
     * "emp JOIN dept ON emp.deptno + 1 = dept.deptno", adds a project above
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SetOperatorsTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SetOperatorsTest.xml
index a6746ff..c514ad8 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SetOperatorsTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SetOperatorsTest.xml
@@ -32,7 +32,7 @@ LogicalIntersect(all=[false])
     <Resource name="planAfter">
       <![CDATA[
 HashAggregate(isMerge=[false], groupBy=[c], select=[c])
-+- HashJoin(joinType=[LeftSemiJoin], where=[OR(=(c, f), AND(IS NULL(c), IS NULL(f)))], select=[c], build=[left])
++- HashJoin(joinType=[LeftSemiJoin], where=[IS NOT DISTINCT FROM(c, f)], select=[c], build=[left])
    :- Exchange(distribution=[hash[c]])
    :  +- Calc(select=[c])
    :     +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
@@ -127,7 +127,7 @@ LogicalMinus(all=[false])
     <Resource name="planAfter">
       <![CDATA[
 HashAggregate(isMerge=[false], groupBy=[c], select=[c])
-+- HashJoin(joinType=[LeftAntiJoin], where=[OR(=(c, f), AND(IS NULL(c), IS NULL(f)))], select=[c], build=[left])
++- HashJoin(joinType=[LeftAntiJoin], where=[IS NOT DISTINCT FROM(c, f)], select=[c], build=[left])
    :- Exchange(distribution=[hash[c]])
    :  +- Calc(select=[c])
    :     +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.xml
index b0e8f8b..141cbdf 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.xml
@@ -1090,11 +1090,11 @@ LogicalIntersect(all=[false])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-NestedLoopJoin(joinType=[LeftSemiJoin], where=[OR(=(random, random0), AND(IS NULL(random), IS NULL(random0)))], select=[random], build=[right])
+NestedLoopJoin(joinType=[LeftSemiJoin], where=[IS NOT DISTINCT FROM(random, random0)], select=[random], build=[right])
 :- SortAggregate(isMerge=[false], groupBy=[random], select=[random])
 :  +- Sort(orderBy=[random ASC])
 :     +- Exchange(distribution=[hash[random]])
-:        +- NestedLoopJoin(joinType=[LeftSemiJoin], where=[OR(=(random, random0), AND(IS NULL(random), IS NULL(random0)))], select=[random], build=[right])
+:        +- NestedLoopJoin(joinType=[LeftSemiJoin], where=[IS NOT DISTINCT FROM(random, random0)], select=[random], build=[right])
 :           :- Exchange(distribution=[any], exchange_mode=[BATCH])
 :           :  +- Calc(select=[random], reuse_id=[1])
 :           :     +- SortLimit(orderBy=[EXPR$1 ASC], offset=[0], fetch=[1], global=[true])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.xml
index e281fb4..0377621 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.xml
@@ -38,11 +38,11 @@ GROUP BY b
 LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)], EXPR$2=[SUM($2)], EXPR$3=[SUM($3)])
 +- LogicalProject(b=[$0], a=[$1], c=[$2], d=[$3])
    +- LogicalFilter(condition=[>($7, 10)])
-      +- LogicalFilter(condition=[=($1, $5)])
-         +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{4}])
-            :- LogicalProject(b=[$1], a=[$0], c=[$2], d=[$3], proctime=[PROCTIME()])
-            :  +- LogicalAggregate(group=[{0, 1}], c=[SUM($2)], d=[SUM($3)])
-            :     +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+      +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{1, 4}])
+         :- LogicalProject(b=[$1], a=[$0], c=[$2], d=[$3], proctime=[PROCTIME()])
+         :  +- LogicalAggregate(group=[{0, 1}], c=[SUM($2)], d=[SUM($3)])
+         :     +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+         +- LogicalFilter(condition=[=($cor0.a, $0)])
             +- LogicalSnapshot(period=[$cor0.proctime])
                +- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
 ]]>
@@ -69,10 +69,10 @@ HashAggregate(isMerge=[true], groupBy=[b], select=[b, Final_COUNT(count$0) AS EX
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], id=[$4], name=[$5], age=[$6])
-+- LogicalFilter(condition=[=($0, $4)])
-   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{3}])
-      :- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()])
-      :  +- LogicalTableScan(table=[[default_catalog, default_database, T0]])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+   :- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T0]])
+   +- LogicalFilter(condition=[=($cor0.a, $0)])
       +- LogicalSnapshot(period=[$cor0.proctime])
          +- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
 ]]>
@@ -98,10 +98,10 @@ WHERE T.c > 1000
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], id=[$4], name=[$5], age=[$6])
 +- LogicalFilter(condition=[>($2, 1000)])
-   +- LogicalFilter(condition=[AND(=($0, $4), =($6, 10))])
-      +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{3}])
-         :- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()])
-         :  +- LogicalTableScan(table=[[default_catalog, default_database, T0]])
+   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+      :- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()])
+      :  +- LogicalTableScan(table=[[default_catalog, default_database, T0]])
+      +- LogicalFilter(condition=[AND(=($cor0.a, $0), =($2, 10))])
          +- LogicalSnapshot(period=[$cor0.proctime])
             +- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
 ]]>
@@ -122,12 +122,12 @@ Calc(select=[a, b, c, proctime, id, name, CAST(10) AS age])
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], proctime=[$2], id=[$3], name=[$4], age=[$5])
-+- LogicalFilter(condition=[=($0, $3)])
-   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{2}])
-      :- LogicalProject(a=[$0], b=[$1], proctime=[$3])
-      :  +- LogicalFilter(condition=[>($2, 1000)])
-      :     +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()])
-      :        +- LogicalTableScan(table=[[default_catalog, default_database, T0]])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 2}])
+   :- LogicalProject(a=[$0], b=[$1], proctime=[$3])
+   :  +- LogicalFilter(condition=[>($2, 1000)])
+   :     +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()])
+   :        +- LogicalTableScan(table=[[default_catalog, default_database, T0]])
+   +- LogicalFilter(condition=[=($cor0.a, $0)])
       +- LogicalSnapshot(period=[$cor0.proctime])
          +- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
 ]]>
@@ -147,10 +147,10 @@ LookupJoin(table=[TestTemporalTable(id, name, age)], joinType=[InnerJoin], async
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], id=[$4], name=[$5], age=[$6])
-+- LogicalFilter(condition=[=($0, $4)])
-   +- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{3}])
-      :- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()])
-      :  +- LogicalTableScan(table=[[default_catalog, default_database, T0]])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 3}])
+   :- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T0]])
+   +- LogicalFilter(condition=[=($cor0.a, $0)])
       +- LogicalSnapshot(period=[$cor0.proctime])
          +- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
 ]]>
@@ -175,10 +175,10 @@ ON T.a = D.id
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], id=[$4])
-+- LogicalFilter(condition=[=($0, $4)])
-   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{3}])
-      :- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()])
-      :  +- LogicalTableScan(table=[[default_catalog, default_database, T0]])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+   :- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T0]])
+   +- LogicalFilter(condition=[=($cor0.a, $0)])
       +- LogicalSnapshot(period=[$cor0.proctime])
          +- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
 ]]>
@@ -213,11 +213,11 @@ GROUP BY b
 LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)], EXPR$2=[SUM($2)], EXPR$3=[SUM($3)])
 +- LogicalProject(b=[$0], a=[$1], c=[$2], d=[$3])
    +- LogicalFilter(condition=[>($7, 10)])
-      +- LogicalFilter(condition=[=($1, $5)])
-         +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{4}])
-            :- LogicalProject(b=[$1], a=[$0], c=[$2], d=[$3], proctime=[PROCTIME()])
-            :  +- LogicalAggregate(group=[{0, 1}], c=[SUM($2)], d=[SUM($3)])
-            :     +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+      +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{1, 4}])
+         :- LogicalProject(b=[$1], a=[$0], c=[$2], d=[$3], proctime=[PROCTIME()])
+         :  +- LogicalAggregate(group=[{0, 1}], c=[SUM($2)], d=[SUM($3)])
+         :     +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+         +- LogicalFilter(condition=[=($cor0.a, $0)])
             +- LogicalSnapshot(period=[$cor0.proctime])
                +- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
 ]]>
@@ -274,20 +274,20 @@ LogicalProject(EXPR$0=[$2], EXPR$1=[$3], EXPR$2=[$4])
          +- LogicalJoin(condition=[true], joinType=[inner])
             :- LogicalProject(b=[$0], a=[$1], c=[$2], d=[$3], proctime=[$4], id=[$5], name=[$6], age=[$7])
             :  +- LogicalFilter(condition=[>($7, 10)])
-            :     +- LogicalFilter(condition=[=($1, $5)])
-            :        +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{4}])
-            :           :- LogicalProject(b=[$1], a=[$0], c=[$2], d=[$3], proctime=[PROCTIME()])
-            :           :  +- LogicalAggregate(group=[{0, 1}], c=[SUM($2)], d=[SUM($3)])
-            :           :     +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+            :     +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{1, 4}])
+            :        :- LogicalProject(b=[$1], a=[$0], c=[$2], d=[$3], proctime=[PROCTIME()])
+            :        :  +- LogicalAggregate(group=[{0, 1}], c=[SUM($2)], d=[SUM($3)])
+            :        :     +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+            :        +- LogicalFilter(condition=[=($cor0.a, $0)])
             :           +- LogicalSnapshot(period=[$cor0.proctime])
             :              +- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
             +- LogicalProject(a=[$5], b=[$0])
                +- LogicalFilter(condition=[>($7, 10)])
-                  +- LogicalFilter(condition=[=($1, $5)])
-                     +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], requiredColumns=[{4}])
-                        :- LogicalProject(b=[$1], a=[$0], c=[$2], d=[$3], proctime=[PROCTIME()])
-                        :  +- LogicalAggregate(group=[{0, 1}], c=[SUM($2)], d=[SUM($3)])
-                        :     +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+                  +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], requiredColumns=[{1, 4}])
+                     :- LogicalProject(b=[$1], a=[$0], c=[$2], d=[$3], proctime=[PROCTIME()])
+                     :  +- LogicalAggregate(group=[{0, 1}], c=[SUM($2)], d=[SUM($3)])
+                     :     +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+                     +- LogicalFilter(condition=[=($cor1.a, $0)])
                         +- LogicalSnapshot(period=[$cor1.proctime])
                            +- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
 ]]>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/ReplaceIntersectWithSemiJoinRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/ReplaceIntersectWithSemiJoinRuleTest.xml
index 1c3b540..e1e5afd 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/ReplaceIntersectWithSemiJoinRuleTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/ReplaceIntersectWithSemiJoinRuleTest.xml
@@ -32,7 +32,7 @@ LogicalIntersect(all=[false])
     <Resource name="planAfter">
       <![CDATA[
 LogicalAggregate(group=[{0}])
-+- LogicalJoin(condition=[OR(=($0, $1), AND(IS NULL($0), IS NULL($1)))], joinType=[semi])
++- LogicalJoin(condition=[IS NOT DISTINCT FROM($0, $1)], joinType=[semi])
    :- LogicalProject(c=[$2])
    :  +- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]])
    +- LogicalProject(f=[$2])
@@ -57,7 +57,7 @@ LogicalIntersect(all=[false])
     <Resource name="planAfter">
       <![CDATA[
 LogicalAggregate(group=[{0}])
-+- LogicalJoin(condition=[OR(=($0, $1), AND(IS NULL($0), IS NULL($1)))], joinType=[semi])
++- LogicalJoin(condition=[IS NOT DISTINCT FROM($0, $1)], joinType=[semi])
    :- LogicalProject(c=[$2])
    :  +- LogicalFilter(condition=[=(1, 0)])
    :     +- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]])
@@ -83,7 +83,7 @@ LogicalIntersect(all=[false])
     <Resource name="planAfter">
       <![CDATA[
 LogicalAggregate(group=[{0}])
-+- LogicalJoin(condition=[OR(=($0, $1), AND(IS NULL($0), IS NULL($1)))], joinType=[semi])
++- LogicalJoin(condition=[IS NOT DISTINCT FROM($0, $1)], joinType=[semi])
    :- LogicalProject(c=[$2])
    :  +- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]])
    +- LogicalProject(f=[$2])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/ReplaceMinusWithAntiJoinRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/ReplaceMinusWithAntiJoinRuleTest.xml
index f000d5c..4470529 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/ReplaceMinusWithAntiJoinRuleTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/ReplaceMinusWithAntiJoinRuleTest.xml
@@ -32,7 +32,7 @@ LogicalMinus(all=[false])
     <Resource name="planAfter">
       <![CDATA[
 LogicalAggregate(group=[{0}])
-+- LogicalJoin(condition=[OR(=($0, $1), AND(IS NULL($0), IS NULL($1)))], joinType=[anti])
++- LogicalJoin(condition=[IS NOT DISTINCT FROM($0, $1)], joinType=[anti])
    :- LogicalProject(c=[$2])
    :  +- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]])
    +- LogicalProject(f=[$2])
@@ -57,7 +57,7 @@ LogicalMinus(all=[false])
     <Resource name="planAfter">
       <![CDATA[
 LogicalAggregate(group=[{0}])
-+- LogicalJoin(condition=[OR(=($0, $1), AND(IS NULL($0), IS NULL($1)))], joinType=[anti])
++- LogicalJoin(condition=[IS NOT DISTINCT FROM($0, $1)], joinType=[anti])
    :- LogicalProject(c=[$2])
    :  +- LogicalFilter(condition=[=(1, 0)])
    :     +- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]])
@@ -83,7 +83,7 @@ LogicalMinus(all=[false])
     <Resource name="planAfter">
       <![CDATA[
 LogicalAggregate(group=[{0}])
-+- LogicalJoin(condition=[OR(=($0, $1), AND(IS NULL($0), IS NULL($1)))], joinType=[anti])
++- LogicalJoin(condition=[IS NOT DISTINCT FROM($0, $1)], joinType=[anti])
    :- LogicalProject(c=[$2])
    :  +- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]])
    +- LogicalProject(f=[$2])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/subquery/SubqueryCorrelateVariablesValidationTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/subquery/SubqueryCorrelateVariablesValidationTest.xml
new file mode 100644
index 0000000..9316828
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/subquery/SubqueryCorrelateVariablesValidationTest.xml
@@ -0,0 +1,102 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testWithProjectProjectCorrelate">
+    <Resource name="sql">
+      <![CDATA[
+SELECT (SELECT min(t1.t1d) FROM t3 WHERE t3.t3a = 'test') min_t1d
+FROM   t1
+WHERE  t1a = 'test'
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(min_t1d=[$SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[MIN($0)])
+  LogicalProject($f0=[$cor0.t1d])
+    LogicalFilter(condition=[=($0, _UTF-16LE'test')])
+      LogicalTableScan(table=[[default_catalog, default_database, t3, source: [TestTableSource(t3a, t3b, t3c, t3d, t3e, t3f, t3g, t3h, t3i)]]])
+})])
++- LogicalFilter(condition=[=($0, _UTF-16LE'test')])
+   +- LogicalTableScan(table=[[default_catalog, default_database, t1, source: [TestTableSource(t1a, t1b, t1c, t1d, t1e, t1f, t1g, t1h, t1i)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(min_t1d=[$10])
++- LogicalJoin(condition=[=($3, $9)], joinType=[left])
+   :- LogicalFilter(condition=[=($0, _UTF-16LE'test')])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, t1, source: [TestTableSource(t1a, t1b, t1c, t1d, t1e, t1f, t1g, t1h, t1i)]]])
+   +- LogicalAggregate(group=[{0}], EXPR$0=[MIN($1)])
+      +- LogicalProject(t1d=[$9], $f0=[$9])
+         +- LogicalJoin(condition=[true], joinType=[inner])
+            :- LogicalFilter(condition=[=($0, _UTF-16LE'test')])
+            :  +- LogicalTableScan(table=[[default_catalog, default_database, t3, source: [TestTableSource(t3a, t3b, t3c, t3d, t3e, t3f, t3g, t3h, t3i)]]])
+            +- LogicalAggregate(group=[{0}])
+               +- LogicalProject(t1d=[$3])
+                  +- LogicalFilter(condition=[=($0, _UTF-16LE'test')])
+                     +- LogicalTableScan(table=[[default_catalog, default_database, t1, source: [TestTableSource(t1a, t1b, t1c, t1d, t1e, t1f, t1g, t1h, t1i)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testWithProjectFilterCorrelate">
+    <Resource name="sql">
+      <![CDATA[
+SELECT (SELECT min(t3d) FROM t3 WHERE t3.t3a = t1.t1a) min_t3d,
+       (SELECT max(t2h) FROM t2 WHERE t2.t2a = t1.t1a) max_t2h
+FROM   t1
+    WHERE  t1a = 'test'
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(min_t3d=[$SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[MIN($0)])
+  LogicalProject(t3d=[$3])
+    LogicalFilter(condition=[=($0, $cor0.t1a)])
+      LogicalTableScan(table=[[default_catalog, default_database, t3, source: [TestTableSource(t3a, t3b, t3c, t3d, t3e, t3f, t3g, t3h, t3i)]]])
+})], max_t2h=[$SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[MAX($0)])
+  LogicalProject(t2h=[$7])
+    LogicalFilter(condition=[=($0, $cor1.t1a)])
+      LogicalTableScan(table=[[default_catalog, default_database, t2, source: [TestTableSource(t2a, t2b, t2c, t2d, t2e, t2f, t2g, t2h, t2i)]]])
+})])
++- LogicalFilter(condition=[=($0, _UTF-16LE'test')])
+   +- LogicalTableScan(table=[[default_catalog, default_database, t1, source: [TestTableSource(t1a, t1b, t1c, t1d, t1e, t1f, t1g, t1h, t1i)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(min_t3d=[$9], max_t2h=[$11])
++- LogicalJoin(condition=[=($0, $10)], joinType=[left])
+   :- LogicalProject(t1a=[$0], t1b=[$1], t1c=[$2], t1d=[$3], t1e=[$4], t1f=[$5], t1g=[$6], t1h=[$7], t1i=[$8], EXPR$0=[$10])
+   :  +- LogicalJoin(condition=[=($0, $9)], joinType=[left])
+   :     :- LogicalFilter(condition=[=($0, _UTF-16LE'test')])
+   :     :  +- LogicalTableScan(table=[[default_catalog, default_database, t1, source: [TestTableSource(t1a, t1b, t1c, t1d, t1e, t1f, t1g, t1h, t1i)]]])
+   :     +- LogicalAggregate(group=[{0}], EXPR$0=[MIN($1)])
+   :        +- LogicalProject(t3a=[$0], t3d=[$3])
+   :           +- LogicalFilter(condition=[IS NOT NULL($0)])
+   :              +- LogicalTableScan(table=[[default_catalog, default_database, t3, source: [TestTableSource(t3a, t3b, t3c, t3d, t3e, t3f, t3g, t3h, t3i)]]])
+   +- LogicalAggregate(group=[{0}], EXPR$0=[MAX($1)])
+      +- LogicalProject(t2a=[$0], t2h=[$7])
+         +- LogicalFilter(condition=[IS NOT NULL($0)])
+            +- LogicalTableScan(table=[[default_catalog, default_database, t2, source: [TestTableSource(t2a, t2b, t2c, t2d, t2e, t2f, t2g, t2h, t2i)]]])
+]]>
+    </Resource>
+  </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SetOperatorsTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SetOperatorsTest.xml
index 8094d28..66a6422 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SetOperatorsTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SetOperatorsTest.xml
@@ -33,7 +33,7 @@ LogicalIntersect(all=[false])
       <![CDATA[
 GroupAggregate(groupBy=[c], select=[c])
 +- Exchange(distribution=[hash[c]])
-   +- Join(joinType=[LeftSemiJoin], where=[OR(=(c, f), AND(IS NULL(c), IS NULL(f)))], select=[c], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   +- Join(joinType=[LeftSemiJoin], where=[IS NOT DISTINCT FROM(c, f)], select=[c], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
       :- Exchange(distribution=[hash[c]])
       :  +- Calc(select=[c])
       :     +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
@@ -128,7 +128,7 @@ LogicalMinus(all=[false])
       <![CDATA[
 GroupAggregate(groupBy=[c], select=[c])
 +- Exchange(distribution=[hash[c]])
-   +- Join(joinType=[LeftAntiJoin], where=[OR(=(c, f), AND(IS NULL(c), IS NULL(f)))], select=[c], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   +- Join(joinType=[LeftAntiJoin], where=[IS NOT DISTINCT FROM(c, f)], select=[c], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
       :- Exchange(distribution=[hash[c]])
       :  +- Calc(select=[c])
       :     +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SubplanReuseTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SubplanReuseTest.xml
index bfe8327..11df2ea 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SubplanReuseTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SubplanReuseTest.xml
@@ -668,11 +668,11 @@ LogicalIntersect(all=[false])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Join(joinType=[LeftSemiJoin], where=[OR(=(random, random0), AND(IS NULL(random), IS NULL(random0)))], select=[random], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[NoUniqueKey])
+Join(joinType=[LeftSemiJoin], where=[IS NOT DISTINCT FROM(random, random0)], select=[random], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[NoUniqueKey])
 :- Exchange(distribution=[hash[random]])
 :  +- GroupAggregate(groupBy=[random], select=[random])
 :     +- Exchange(distribution=[hash[random]])
-:        +- Join(joinType=[LeftSemiJoin], where=[OR(=(random, random0), AND(IS NULL(random), IS NULL(random0)))], select=[random], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+:        +- Join(joinType=[LeftSemiJoin], where=[IS NOT DISTINCT FROM(random, random0)], select=[random], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
 :           :- Exchange(distribution=[hash[random]], reuse_id=[1])
 :           :  +- Calc(select=[random])
 :           :     +- SortLimit(orderBy=[EXPR$1 ASC], offset=[0], fetch=[1])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.xml
index a734771..8e56ae1 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.xml
@@ -29,9 +29,9 @@ WHERE cast(D.name as bigint) > 1000
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
 +- LogicalFilter(condition=[>(CAST($6):BIGINT, 1000)])
-   +- LogicalFilter(condition=[AND(=($0, $5), =($7, 10))])
-      +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{3}])
-         :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+      :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+      +- LogicalFilter(condition=[AND(=($cor0.a, $0), =($2, 10))])
          +- LogicalSnapshot(period=[$cor0.proctime])
             +- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
 ]]>
@@ -66,11 +66,11 @@ GROUP BY b
 LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)], EXPR$2=[SUM($2)], EXPR$3=[SUM($3)])
 +- LogicalProject(b=[$0], a=[$1], c=[$2], d=[$3])
    +- LogicalFilter(condition=[>($7, 10)])
-      +- LogicalFilter(condition=[=($1, $5)])
-         +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{4}])
-            :- LogicalProject(b=[$1], a=[$0], c=[$2], d=[$3], proc=[PROCTIME()])
-            :  +- LogicalAggregate(group=[{0, 1}], c=[SUM($2)], d=[SUM($3)])
-            :     +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+      +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{1, 4}])
+         :- LogicalProject(b=[$1], a=[$0], c=[$2], d=[$3], proc=[PROCTIME()])
+         :  +- LogicalAggregate(group=[{0, 1}], c=[SUM($2)], d=[SUM($3)])
+         :     +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+         +- LogicalFilter(condition=[=($cor0.a, $0)])
             +- LogicalSnapshot(period=[$cor0.proc])
                +- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
 ]]>
@@ -95,9 +95,9 @@ GroupAggregate(groupBy=[b], select=[b, COUNT_RETRACT(a) AS EXPR$1, SUM_RETRACT(c
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
-+- LogicalFilter(condition=[=($0, $5)])
-   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{3}])
-      :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+   +- LogicalFilter(condition=[=($cor0.a, $0)])
       +- LogicalSnapshot(period=[$cor0.proctime])
          +- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
 ]]>
@@ -123,9 +123,9 @@ WHERE T.c > 1000
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
 +- LogicalFilter(condition=[>($2, 1000)])
-   +- LogicalFilter(condition=[AND(=($0, $5), =($7, 10))])
-      +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{3}])
-         :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+      :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+      +- LogicalFilter(condition=[AND(=($cor0.a, $0), =($2, 10))])
          +- LogicalSnapshot(period=[$cor0.proctime])
             +- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
 ]]>
@@ -152,9 +152,9 @@ WHERE T.c > 1000
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
 +- LogicalFilter(condition=[>($2, 1000)])
-   +- LogicalFilter(condition=[AND(=($0, $5), =($7, 10), =($6, _UTF-16LE'AAA'))])
-      +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{3}])
-         :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+      :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+      +- LogicalFilter(condition=[AND(=($cor0.a, $0), =($2, 10), =($1, _UTF-16LE'AAA'))])
          +- LogicalSnapshot(period=[$cor0.proctime])
             +- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
 ]]>
@@ -175,11 +175,11 @@ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, C
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], proctime=[$2], id=[$3], name=[$4], age=[$5])
-+- LogicalFilter(condition=[=($0, $3)])
-   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{2}])
-      :- LogicalProject(a=[$0], b=[$1], proctime=[$3])
-      :  +- LogicalFilter(condition=[>($2, 1000)])
-      :     +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 2}])
+   :- LogicalProject(a=[$0], b=[$1], proctime=[$3])
+   :  +- LogicalFilter(condition=[>($2, 1000)])
+   :     +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+   +- LogicalFilter(condition=[=($cor0.a, $0)])
       +- LogicalSnapshot(period=[$cor0.proctime])
          +- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
 ]]>
@@ -200,9 +200,9 @@ Calc(select=[a, b, PROCTIME_MATERIALIZE(proctime) AS proctime, id, name, age])
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
-+- LogicalFilter(condition=[=($0, $5)])
-   +- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{3}])
-      :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 3}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+   +- LogicalFilter(condition=[=($cor0.a, $0)])
       +- LogicalSnapshot(period=[$cor0.proctime])
          +- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
 ]]>
@@ -227,9 +227,9 @@ ON T.a = D.id
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5])
-+- LogicalFilter(condition=[=($0, $5)])
-   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{3}])
-      :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+   +- LogicalFilter(condition=[=($cor0.a, $0)])
       +- LogicalSnapshot(period=[$cor0.proctime])
          +- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
 ]]>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala
index b6c2b48..bb43ecc 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala
@@ -55,14 +55,14 @@ class ScalarFunctionsValidationTest extends ScalarTypesTestBase {
   @Test
   def testTimestampAddWithWrongTimestampInterval(): Unit = {
     thrown.expect(classOf[SqlParserException])
-    testSqlApi("TIMESTAMPADD(XXX, 1, timestamp '2016-02-24'))", "2016-06-16")
+    testSqlApi("TIMESTAMPADD(XXX, 1, timestamp '2016-02-24')", "2016-06-16")
   }
 
   @Test
   def testTimestampAddWithWrongTimestampFormat(): Unit = {
     thrown.expect(classOf[SqlParserException])
-    thrown.expectMessage("Illegal TIMESTAMP literal '2016-02-24'")
-    testSqlApi("TIMESTAMPADD(YEAR, 1, timestamp '2016-02-24'))", "2016-06-16")
+    thrown.expectMessage("Illegal TIMESTAMP literal '2016/02/24'")
+    testSqlApi("TIMESTAMPADD(YEAR, 1, timestamp '2016/02/24')", "2016-06-16")
   }
 
   @Test
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.scala
index 9ba36e9..7169f7a 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.scala
@@ -58,8 +58,7 @@ class SubplanReuseTest extends TableTestBase {
     util.verifyPlanNotExpected(sqlQuery, "Reused")
   }
 
-  @Test(expected = classOf[AssertionError])
-  // TODO after CALCITE-3020 fixed, remove expected exception
+  @Test
   def testSubplanReuseWithDifferentRowType(): Unit = {
     util.tableEnv.getConfig.getConf.setBoolean(
       OptimizerConfigOptions.SQL_OPTIMIZER_REUSE_TABLE_SOURCE_ENABLED, false)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala
index 111cf9f..8d7c348 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala
@@ -74,7 +74,7 @@ class FlinkRelMdHandlerTestBase {
 
   val tableConfig = new TableConfig()
   val rootSchema: SchemaPlus = MetadataTestUtil.initRootSchema()
-  // TODO batch RelNode and stream RelNode should have different PlanningConfigurationBuilder
+  // TODO batch RelNode and stream RelNode should have different PlannerContext
   //  and RelOptCluster due to they have different trait definitions.
   val plannerContext: PlannerContext =
     new PlannerContext(
@@ -728,7 +728,6 @@ class FlinkRelMdHandlerTestBase {
       cluster,
       flinkLogicalTraits,
       studentFlinkLogicalScan,
-      logicalAgg.indicator,
       logicalAgg.getGroupSet,
       logicalAgg.getGroupSets,
       logicalAgg.getAggCallList
@@ -878,7 +877,6 @@ class FlinkRelMdHandlerTestBase {
       cluster,
       flinkLogicalTraits,
       studentFlinkLogicalScan,
-      logicalAggWithAuxGroup.indicator,
       logicalAggWithAuxGroup.getGroupSet,
       logicalAggWithAuxGroup.getGroupSets,
       logicalAggWithAuxGroup.getAggCallList
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/subquery/SubQueryAntiJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/subquery/SubQueryAntiJoinTest.scala
index dd5c042..71dc9a4e 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/subquery/SubQueryAntiJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/subquery/SubQueryAntiJoinTest.scala
@@ -213,8 +213,8 @@ class SubQueryAntiJoinTest extends SubQueryTestBase {
   def testNotInWithUncorrelatedOnWhere_Case7(): Unit = {
     util.addTableSource[(Int)]("t1", 'i)
 
-    thrown.expect(classOf[TableException])
-    thrown.expectMessage("unexpected correlate variable $cor0 in the plan")
+    // TODO some bugs in SubQueryRemoveRule
+    thrown.expect(classOf[RuntimeException])
 
     // TODO Calcite does not support project with correlated expressions.
     val sqlQuery = "SELECT b FROM l WHERE " +
@@ -725,8 +725,10 @@ class SubQueryAntiJoinTest extends SubQueryTestBase {
   def testNotInNotExists3(): Unit = {
     util.addTableSource[(Int, Long, String)]("t2", 'l, 'm, 'n)
 
-    thrown.expect(classOf[TableException])
-    thrown.expectMessage("unexpected correlate variable $cor0 in the plan")
+    // TODO some bugs in SubQueryRemoveRule
+    //  the result RelNode (LogicalJoin(condition=[=($1, $11)], joinType=[left]))
+    //  after SubQueryRemoveRule is unexpected
+    thrown.expect(classOf[RuntimeException])
 
     // TODO Calcite does not support project with correlated expressions.
     val sqlQuery = "SELECT c FROM l WHERE (" +
@@ -752,8 +754,8 @@ class SubQueryAntiJoinTest extends SubQueryTestBase {
   def testInNotInExistsNotExists2(): Unit = {
     util.addTableSource[(Int, Long, String)]("t2", 'l, 'm, 'n)
 
-    thrown.expect(classOf[TableException])
-    thrown.expectMessage("unexpected correlate variable $cor0 in the plan")
+    // TODO some bugs in SubQueryRemoveRule
+    thrown.expect(classOf[RuntimeException])
 
     // TODO Calcite does not support project with correlated expressions.
     val sqlQuery = "SELECT c FROM l WHERE (" +
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/subquery/SubQuerySemiJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/subquery/SubQuerySemiJoinTest.scala
index c1225f6..944eb02 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/subquery/SubQuerySemiJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/subquery/SubQuerySemiJoinTest.scala
@@ -208,9 +208,8 @@ class SubQuerySemiJoinTest extends SubQueryTestBase {
     util.addTableSource[(Int)]("t1", 'i)
     util.addTableSource[(Int)]("t2", 'j)
 
-    thrown.expect(classOf[TableException])
-    // correlate variable id is unstable, ignore here
-    thrown.expectMessage("unexpected correlate variable $cor")
+    // TODO some bugs in SubQueryRemoveRule
+    thrown.expect(classOf[RuntimeException])
 
     // TODO Calcite does not support project with correlated expressions.
     val sqlQuery = "SELECT b FROM l WHERE" +
@@ -1659,9 +1658,10 @@ class SubQuerySemiJoinTest extends SubQueryTestBase {
   def testInExists3(): Unit = {
     util.addTableSource[(Int, Long, String)]("t2", 'l, 'm, 'n)
 
-    thrown.expect(classOf[TableException])
-    // correlate variable id is unstable, ignore here
-    thrown.expectMessage("unexpected correlate variable $cor")
+    // TODO some bugs in SubQueryRemoveRule
+    //  the result RelNode (LogicalJoin(condition=[=($1, $8)], joinType=[left]))
+    //  after SubQueryRemoveRule is unexpected
+    thrown.expect(classOf[RuntimeException])
 
     // TODO Calcite does not support project with correlated expressions.
     val sqlQuery = "SELECT c FROM l WHERE (" +
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/subquery/SubqueryCorrelateVariablesValidationTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/subquery/SubqueryCorrelateVariablesValidationTest.scala
index 70e09aa..f3b07cb 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/subquery/SubqueryCorrelateVariablesValidationTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/subquery/SubqueryCorrelateVariablesValidationTest.scala
@@ -35,7 +35,7 @@ class SubqueryCorrelateVariablesValidationTest extends SubQueryTestBase {
   util.addTableSource[(String, Short, Int, Long, Float, Double, BigDecimal, Timestamp, Date)](
     "t3", 't3a, 't3b, 't3c, 't3d, 't3e, 't3f, 't3g, 't3h, 't3i)
 
-  @Test(expected = classOf[RuntimeException])
+  @Test
   def testWithProjectProjectCorrelate(): Unit = {
     val sqlQuery =
       """
@@ -46,7 +46,7 @@ class SubqueryCorrelateVariablesValidationTest extends SubQueryTestBase {
     util.verifyPlan(sqlQuery)
   }
 
-  @Test(expected = classOf[RuntimeException])
+  @Test
   def testWithProjectFilterCorrelate(): Unit = {
     val sqlQuery =
       """
@@ -111,7 +111,8 @@ class SubqueryCorrelateVariablesValidationTest extends SubQueryTestBase {
     util.verifyPlan(sqlQuery)
   }
 
-  @Test(expected = classOf[RuntimeException])
+  @Test(expected = classOf[AssertionError])
+  // TODO some bugs in RelDecorrelator.AdjustProjectForCountAggregateRule
   def testWithProjectCaseWhenCorrelate(): Unit = {
     val sqlQuery =
       """
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/SubplanReuseTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/SubplanReuseTest.scala
index 2fa8d39..8a455b7 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/SubplanReuseTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/SubplanReuseTest.scala
@@ -57,8 +57,7 @@ class SubplanReuseTest extends TableTestBase {
     util.verifyPlanNotExpected(sqlQuery, "Reused")
   }
 
-  @Test(expected = classOf[AssertionError])
-  // TODO after CALCITE-3020 fixed, remove expected exception
+  @Test
   def testSubplanReuseWithDifferentRowType(): Unit = {
     util.tableEnv.getConfig.getConf.setBoolean(
       OptimizerConfigOptions.SQL_OPTIMIZER_REUSE_TABLE_SOURCE_ENABLED, false)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
index c3a9e2d..1294ff4 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
@@ -77,8 +77,8 @@ class LookupJoinTest extends TableTestBase with Serializable {
     expectExceptionThrown(
       "SELECT * FROM MyTable AS T RIGHT JOIN temporalTest " +
         "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id",
-      "Unsupported join type for semi-join RIGHT",
-      classOf[IllegalArgumentException]
+      "Correlate has invalid join type RIGHT",
+      classOf[AssertionError]
     )
 
     // only support join on raw key of right table
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/PruneAggregateCallITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/PruneAggregateCallITCase.scala
index cd55c6a..00a8b76 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/PruneAggregateCallITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/PruneAggregateCallITCase.scala
@@ -46,7 +46,7 @@ class PruneAggregateCallITCase extends BatchTestBase {
     checkResult(
       """
         |SELECT c, a FROM
-        | (SELECT a, c, COUNT(b) as c, SUM(b) as s FROM MyTable GROUP BY a, c) t
+        | (SELECT a, c, COUNT(b) as cnt, SUM(b) as s FROM MyTable GROUP BY a, c) t
         |WHERE s > 1
       """.stripMargin,
       Seq(row("Hello world", 3), row("Hello", 2))
diff --git a/flink-table/flink-table-runtime-blink/pom.xml b/flink-table/flink-table-runtime-blink/pom.xml
index aeea8c1..f407bac 100644
--- a/flink-table/flink-table-runtime-blink/pom.xml
+++ b/flink-table/flink-table-runtime-blink/pom.xml
@@ -72,7 +72,7 @@ under the License.
 			<groupId>org.apache.calcite.avatica</groupId>
 			<artifactId>avatica-core</artifactId>
 			<!-- When updating the Calcite version, make sure to update the version and dependency exclusions -->
-			<version>1.13.0</version>
+			<version>1.15.0</version>
 			<exclusions>
 				<!--
 
@@ -81,9 +81,9 @@ under the License.
 				We exclude all the dependencies of Avatica because currently we only use
 				TimeUnit, TimeUnitRange and SqlDateTimeUtils which only dependent JDK.
 
-				"mvn dependency:tree" as of Avatica 1.13:
+				"mvn dependency:tree" as of Avatica 1.15:
 
-				[INFO] +- org.apache.calcite.avatica:avatica-core:jar:1.13.0:compile
+				[INFO] +- org.apache.calcite.avatica:avatica-core:jar:1.15.0:compile
 
 				-->
 				<exclusion>