You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/07/11 00:32:52 UTC

[flink] 01/02: [FLINK-13076] [table-planner-blink] Bump Calcite dependency to 1.20.0 in blink planner

This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 42b04b01976206b6fc7ebcb58871b627463171f0
Author: godfreyhe <go...@163.com>
AuthorDate: Wed Jul 3 16:53:19 2019 +0800

    [FLINK-13076] [table-planner-blink] Bump Calcite dependency to 1.20.0 in blink planner
---
 flink-table/flink-table-planner-blink/pom.xml      |  17 +-
 .../java/org/apache/calcite/rel/core/Join.java     | 341 ---------------------
 .../org/apache/calcite/rel/core/JoinRelType.java   | 184 -----------
 .../apache/calcite/sql2rel/RelDecorrelator.java    |  79 +++--
 .../catalog/FunctionCatalogOperatorTable.java      |   4 +-
 .../table/functions/sql/FlinkSqlOperatorTable.java |  30 +-
 .../plan/rules/logical/FlinkFilterJoinRule.java    |  14 +-
 .../plan/rules/logical/SubQueryDecorrelator.java   |   5 -
 .../table/calcite/FlinkLogicalRelFactories.scala   |   8 +-
 .../table/codegen/CorrelateCodeGenerator.scala     |  12 +-
 .../plan/metadata/FlinkRelMdColumnUniqueness.scala |   6 +-
 .../nodes/calcite/LogicalWindowAggregate.scala     |   3 +-
 .../table/plan/nodes/calcite/WindowAggregate.scala |   1 -
 .../plan/nodes/common/CommonPhysicalJoin.scala     |   5 +-
 .../plan/nodes/logical/FlinkLogicalAggregate.scala |  11 +-
 .../plan/nodes/logical/FlinkLogicalCorrelate.scala |   9 +-
 .../logical/FlinkLogicalTableFunctionScan.scala    |  10 +-
 .../logical/FlinkLogicalWindowAggregate.scala      |   3 +-
 .../nodes/physical/batch/BatchExecCorrelate.scala  |   8 +-
 .../physical/stream/StreamExecCorrelate.scala      |   6 +-
 .../plan/optimize/program/FlinkBatchProgram.scala  |   1 -
 .../plan/optimize/program/FlinkStreamProgram.scala |   1 -
 ...gicalCorrelateToJoinFromTemporalTableRule.scala |  49 ++-
 .../physical/stream/StreamExecCorrelateRule.scala  |  13 +-
 .../flink/table/plan/util/FlinkRelOptUtil.scala    | 194 +-----------
 .../table/plan/batch/sql/SetOperatorsTest.xml      |   4 +-
 .../table/plan/batch/sql/SubplanReuseTest.xml      |   4 +-
 .../table/plan/batch/sql/join/LookupJoinTest.xml   |  84 ++---
 .../ReplaceIntersectWithSemiJoinRuleTest.xml       |   6 +-
 .../logical/ReplaceMinusWithAntiJoinRuleTest.xml   |   6 +-
 .../SubqueryCorrelateVariablesValidationTest.xml   | 102 ++++++
 .../table/plan/stream/sql/SetOperatorsTest.xml     |   4 +-
 .../table/plan/stream/sql/SubplanReuseTest.xml     |   4 +-
 .../table/plan/stream/sql/join/LookupJoinTest.xml  |  56 ++--
 .../validation/ScalarFunctionsValidationTest.scala |   6 +-
 .../table/plan/batch/sql/SubplanReuseTest.scala    |   3 +-
 .../plan/metadata/FlinkRelMdHandlerTestBase.scala  |   4 +-
 .../logical/subquery/SubQueryAntiJoinTest.scala    |  14 +-
 .../logical/subquery/SubQuerySemiJoinTest.scala    |  12 +-
 .../SubqueryCorrelateVariablesValidationTest.scala |   7 +-
 .../table/plan/stream/sql/SubplanReuseTest.scala   |   3 +-
 .../plan/stream/sql/join/LookupJoinTest.scala      |   4 +-
 .../batch/sql/agg/PruneAggregateCallITCase.scala   |   2 +-
 flink-table/flink-table-runtime-blink/pom.xml      |   6 +-
 44 files changed, 390 insertions(+), 955 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/pom.xml b/flink-table/flink-table-planner-blink/pom.xml
index 461d63e..b0ba76f 100644
--- a/flink-table/flink-table-planner-blink/pom.xml
+++ b/flink-table/flink-table-planner-blink/pom.xml
@@ -150,17 +150,13 @@ under the License.
 			<groupId>org.apache.calcite</groupId>
 			<artifactId>calcite-core</artifactId>
 			<!-- When updating the Calcite version, make sure to update the dependency exclusions -->
-			<version>1.19.0</version>
+			<version>1.20.0</version>
 			<exclusions>
 				<!--
+				"mvn dependency:tree" as of Calcite 1.20:
 
-				Dependencies that are not needed for how we use Calcite right now.
-
-				"mvn dependency:tree" as of Calcite 1.19:
-
-				[INFO] +- org.apache.calcite:calcite-core:jar:1.19.0:compile
-				[INFO] |  +- org.apache.calcite.avatica:avatica-core:jar:1.13.0:compile
-				[INFO] |  +- org.apache.calcite:calcite-linq4j:jar:1.19.0:compile
+				[INFO] +- org.apache.calcite:calcite-core:jar:1.20.0:compile
+				[INFO] |  +- org.apache.calcite:calcite-linq4j:jar:1.20.0:compile
 				[INFO] |  +- org.apache.commons:commons-lang3:jar:3.3.2:compile
 				[INFO] |  +- com.fasterxml.jackson.core:jackson-core:jar:2.9.6:compile
 				[INFO] |  +- com.fasterxml.jackson.core:jackson-annotations:jar:2.9.6:compile
@@ -168,6 +164,7 @@ under the License.
 				[INFO] |  +- com.google.guava:guava:jar:19.0:compile
 				[INFO] |  \- com.jayway.jsonpath:json-path:jar:2.4.0:compile
 
+				Dependencies that are not needed for how we use Calcite right now.
 				-->
 				<exclusion>
 					<groupId>org.apache.calcite.avatica</groupId>
@@ -205,6 +202,10 @@ under the License.
 					<groupId>net.hydromatic</groupId>
 					<artifactId>aggdesigner-algorithm</artifactId>
 				</exclusion>
+				<exclusion>
+					<groupId>commons-codec</groupId>
+					<artifactId>commons-codec</artifactId>
+				</exclusion>
 			</exclusions>
 		</dependency>
 
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/rel/core/Join.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/rel/core/Join.java
deleted file mode 100644
index 657efbf..0000000
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/rel/core/Join.java
+++ /dev/null
@@ -1,341 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.calcite.rel.core;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import org.apache.calcite.rel.RelNode.Context;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptCost;
-import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.BiRel;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelWriter;
-import org.apache.calcite.rel.metadata.RelMdUtil;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.rex.RexChecker;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexShuttle;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.sql.validate.SqlValidatorUtil;
-import org.apache.calcite.util.Litmus;
-import org.apache.calcite.util.Util;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import java.util.Set;
-
-/**
- * This class is copied from https://github.com/apache/calcite/pull/1157 to supports SEMI/ANTI join.
- * NOTES: This file should be deleted when upgrading to a new calcite version
- * which contains CALCITE-2969.
- */
-
-/**
- * Relational expression that combines two relational expressions according to
- * some condition.
- *
- * <p>Each output row has columns from the left and right inputs.
- * The set of output rows is a subset of the cartesian product of the two
- * inputs; precisely which subset depends on the join condition.
- */
-public abstract class Join extends BiRel {
-  //~ Instance fields --------------------------------------------------------
-
-  protected final RexNode condition;
-  protected final ImmutableSet<CorrelationId> variablesSet;
-
-  /**
-   * Values must be of enumeration {@link JoinRelType}, except that
-   * {@link JoinRelType#RIGHT} is disallowed.
-   */
-  protected final JoinRelType joinType;
-
-  protected final JoinInfo joinInfo;
-
-  //~ Constructors -----------------------------------------------------------
-
-  // Next time we need to change the constructor of Join, let's change the
-  // "Set<String> variablesStopped" parameter to
-  // "Set<CorrelationId> variablesSet". At that point we would deprecate
-  // RelNode.getVariablesStopped().
-
-  /**
-   * Creates a Join.
-   *
-   * <p>Note: We plan to change the {@code variablesStopped} parameter to
-   * {@code Set&lt;CorrelationId&gt; variablesSet}
-   * {@link org.apache.calcite.util.Bug#upgrade(String) before version 2.0},
-   * because {@link #getVariablesSet()}
-   * is preferred over {@link #getVariablesStopped()}.
-   * This constructor is not deprecated, for now, because maintaining overloaded
-   * constructors in multiple sub-classes would be onerous.
-   *
-   * @param cluster          Cluster
-   * @param traitSet         Trait set
-   * @param left             Left input
-   * @param right            Right input
-   * @param condition        Join condition
-   * @param joinType         Join type
-   * @param variablesSet     Set variables that are set by the
-   *                         LHS and used by the RHS and are not available to
-   *                         nodes above this Join in the tree
-   */
-  protected Join(
-      RelOptCluster cluster,
-      RelTraitSet traitSet,
-      RelNode left,
-      RelNode right,
-      RexNode condition,
-      Set<CorrelationId> variablesSet,
-      JoinRelType joinType) {
-    super(cluster, traitSet, left, right);
-    this.condition = Objects.requireNonNull(condition);
-    this.variablesSet = ImmutableSet.copyOf(variablesSet);
-    this.joinType = Objects.requireNonNull(joinType);
-    this.joinInfo = JoinInfo.of(left, right, condition);
-  }
-
-  @Deprecated // to be removed before 2.0
-  protected Join(
-      RelOptCluster cluster,
-      RelTraitSet traitSet,
-      RelNode left,
-      RelNode right,
-      RexNode condition,
-      JoinRelType joinType,
-      Set<String> variablesStopped) {
-    this(cluster, traitSet, left, right, condition,
-        CorrelationId.setOf(variablesStopped), joinType);
-  }
-
-  //~ Methods ----------------------------------------------------------------
-
-  @Override public List<RexNode> getChildExps() {
-    return ImmutableList.of(condition);
-  }
-
-  @Override public RelNode accept(RexShuttle shuttle) {
-    RexNode condition = shuttle.apply(this.condition);
-    if (this.condition == condition) {
-      return this;
-    }
-    return copy(traitSet, condition, left, right, joinType, isSemiJoinDone());
-  }
-
-  public RexNode getCondition() {
-    return condition;
-  }
-
-  public JoinRelType getJoinType() {
-    return joinType;
-  }
-
-  @Override public boolean isValid(Litmus litmus, Context context) {
-    if (!super.isValid(litmus, context)) {
-      return false;
-    }
-    if (getRowType().getFieldCount()
-        != getSystemFieldList().size()
-        + left.getRowType().getFieldCount()
-        + (joinType.projectsRight() ? right.getRowType().getFieldCount() : 0)) {
-      return litmus.fail("field count mismatch");
-    }
-    if (condition != null) {
-      if (condition.getType().getSqlTypeName() != SqlTypeName.BOOLEAN) {
-        return litmus.fail("condition must be boolean: {}",
-            condition.getType());
-      }
-      // The input to the condition is a row type consisting of system
-      // fields, left fields, and right fields. Very similar to the
-      // output row type, except that fields have not yet been made due
-      // due to outer joins.
-      RexChecker checker =
-          new RexChecker(
-              getCluster().getTypeFactory().builder()
-                  .addAll(getSystemFieldList())
-                  .addAll(getLeft().getRowType().getFieldList())
-                  .addAll(getRight().getRowType().getFieldList())
-                  .build(),
-              context, litmus);
-      condition.accept(checker);
-      if (checker.getFailureCount() > 0) {
-        return litmus.fail(checker.getFailureCount()
-            + " failures in condition " + condition);
-      }
-    }
-    return litmus.succeed();
-  }
-
-  @Override public RelOptCost computeSelfCost(RelOptPlanner planner,
-      RelMetadataQuery mq) {
-    // Maybe we should remove this for semi-join ?
-    if (!joinType.projectsRight()) {
-      // REVIEW jvs 9-Apr-2006:  Just for now...
-      return planner.getCostFactory().makeTinyCost();
-    }
-    double rowCount = mq.getRowCount(this);
-    return planner.getCostFactory().makeCost(rowCount, 0, 0);
-  }
-
-  /** @deprecated Use {@link RelMdUtil#getJoinRowCount(RelMetadataQuery, Join, RexNode)}. */
-  @Deprecated // to be removed before 2.0
-  public static double estimateJoinedRows(
-      Join joinRel,
-      RexNode condition) {
-    final RelMetadataQuery mq = RelMetadataQuery.instance();
-    return Util.first(RelMdUtil.getJoinRowCount(mq, joinRel, condition), 1D);
-  }
-
-  @Override public double estimateRowCount(RelMetadataQuery mq) {
-    return Util.first(RelMdUtil.getJoinRowCount(mq, this, condition), 1D);
-  }
-
-  @Override public Set<CorrelationId> getVariablesSet() {
-    return variablesSet;
-  }
-
-  @Override public RelWriter explainTerms(RelWriter pw) {
-    return super.explainTerms(pw)
-        .item("condition", condition)
-        .item("joinType", joinType.lowerName)
-        .itemIf(
-            "systemFields",
-            getSystemFieldList(),
-            !getSystemFieldList().isEmpty());
-  }
-
-  @Override protected RelDataType deriveRowType() {
-    assert getSystemFieldList() != null;
-    RelDataType leftType = left.getRowType();
-    RelDataType rightType = right.getRowType();
-    RelDataTypeFactory typeFactory = getCluster().getTypeFactory();
-    switch (joinType) {
-      case LEFT:
-        rightType = typeFactory.createTypeWithNullability(rightType, true);
-        break;
-      case RIGHT:
-        leftType = typeFactory.createTypeWithNullability(leftType, true);
-        break;
-      case FULL:
-        leftType = typeFactory.createTypeWithNullability(leftType, true);
-        rightType = typeFactory.createTypeWithNullability(rightType, true);
-        break;
-      case SEMI:
-      case ANTI:
-        rightType = null;
-      default:
-        break;
-    }
-    return createJoinType(typeFactory, leftType, rightType, null, getSystemFieldList());
-  }
-
-  /**
-   * Returns whether this LogicalJoin has already spawned a
-   * {@code SemiJoin} via
-   * {@link org.apache.calcite.rel.rules.JoinAddRedundantSemiJoinRule}.
-   *
-   * <p>The base implementation returns false.</p>
-   *
-   * @return whether this join has already spawned a semi join
-   */
-  public boolean isSemiJoinDone() {
-    return false;
-  }
-
-  /**
-   * Returns whether this Join is a semijoin.
-   *
-   * @return true if this Join's join type is semi.
-   */
-  public boolean isSemiJoin() {
-    return joinType == JoinRelType.SEMI;
-  }
-
-  /**
-   * Returns a list of system fields that will be prefixed to
-   * output row type.
-   *
-   * @return list of system fields
-   */
-  public List<RelDataTypeField> getSystemFieldList() {
-    return Collections.emptyList();
-  }
-
-  @Deprecated // to be removed before 2.0
-  public static RelDataType deriveJoinRowType(
-      RelDataType leftType,
-      RelDataType rightType,
-      JoinRelType joinType,
-      RelDataTypeFactory typeFactory,
-      List<String> fieldNameList,
-      List<RelDataTypeField> systemFieldList) {
-    return SqlValidatorUtil.deriveJoinRowType(leftType, rightType, joinType,
-        typeFactory, fieldNameList, systemFieldList);
-  }
-
-  @Deprecated // to be removed before 2.0
-  public static RelDataType createJoinType(
-      RelDataTypeFactory typeFactory,
-      RelDataType leftType,
-      RelDataType rightType,
-      List<String> fieldNameList,
-      List<RelDataTypeField> systemFieldList) {
-    return SqlValidatorUtil.createJoinType(typeFactory, leftType, rightType,
-        fieldNameList, systemFieldList);
-  }
-
-  @Override public final Join copy(RelTraitSet traitSet, List<RelNode> inputs) {
-    assert inputs.size() == 2;
-    return copy(traitSet, getCondition(), inputs.get(0), inputs.get(1),
-        joinType, isSemiJoinDone());
-  }
-
-  /**
-   * Creates a copy of this join, overriding condition, system fields and
-   * inputs.
-   *
-   * <p>General contract as {@link RelNode#copy}.
-   *
-   * @param traitSet      Traits
-   * @param conditionExpr Condition
-   * @param left          Left input
-   * @param right         Right input
-   * @param joinType      Join type
-   * @param semiJoinDone  Whether this join has been translated to a
-   *                      semi-join
-   * @return Copy of this join
-   */
-  public abstract Join copy(RelTraitSet traitSet, RexNode conditionExpr,
-      RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone);
-
-  /**
-   * Analyzes the join condition.
-   *
-   * @return Analyzed join condition
-   */
-  public JoinInfo analyzeCondition() {
-    return joinInfo;
-  }
-}
-
-// End Join.java
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/rel/core/JoinRelType.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/rel/core/JoinRelType.java
deleted file mode 100644
index 8e69d15..0000000
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/rel/core/JoinRelType.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.calcite.rel.core;
-
-import org.apache.calcite.linq4j.CorrelateJoinType;
-
-import java.util.Locale;
-
-/**
- * This class is copied from https://github.com/apache/calcite/pull/1157 to supports SEMI/ANTI join.
- * NOTES: This file should be deleted when upgrading to a new calcite version
- * which contains CALCITE-2969.
- */
-
-/**
- * Enumeration of join types.
- */
-public enum JoinRelType {
-  /**
-   * Inner join.
-   */
-  INNER,
-
-  /**
-   * Left-outer join.
-   */
-  LEFT,
-
-  /**
-   * Right-outer join.
-   */
-  RIGHT,
-
-  /**
-   * Full-outer join.
-   */
-  FULL,
-
-  /**
-   * Semi-join.
-   *
-   * <p>For example, {@code EMP semi-join DEPT} finds all {@code EMP} records
-   * that have a corresponding {@code DEPT} record:
-   *
-   * <blockquote><pre>
-   * SELECT * FROM EMP
-   * WHERE EXISTS (SELECT 1 FROM DEPT
-   *     WHERE DEPT.DEPTNO = EMP.DEPTNO)</pre>
-   * </blockquote>
-   */
-  SEMI,
-
-  /**
-   * Anti-join.
-   *
-   * <p>For example, {@code EMP anti-join DEPT} finds all {@code EMP} records
-   * that do not have a corresponding {@code DEPT} record:
-   *
-   * <blockquote><pre>
-   * SELECT * FROM EMP
-   * WHERE NOT EXISTS (SELECT 1 FROM DEPT
-   *     WHERE DEPT.DEPTNO = EMP.DEPTNO)</pre>
-   * </blockquote>
-   */
-  ANTI;
-
-  /** Lower-case name. */
-  public final String lowerName = name().toLowerCase(Locale.ROOT);
-
-  /**
-   * Returns whether a join of this type may generate NULL values on the
-   * right-hand side.
-   */
-  public boolean generatesNullsOnRight() {
-    return (this == LEFT) || (this == FULL);
-  }
-
-  /**
-   * Returns whether a join of this type may generate NULL values on the
-   * left-hand side.
-   */
-  public boolean generatesNullsOnLeft() {
-    return (this == RIGHT) || (this == FULL);
-  }
-
-  /**
-   * Swaps left to right, and vice versa.
-   */
-  public JoinRelType swap() {
-    switch (this) {
-    case LEFT:
-      return RIGHT;
-    case RIGHT:
-      return LEFT;
-    default:
-      return this;
-    }
-  }
-
-  /** Returns whether this join type generates nulls on side #{@code i}. */
-  public boolean generatesNullsOn(int i) {
-    switch (i) {
-    case 0:
-      return generatesNullsOnLeft();
-    case 1:
-      return generatesNullsOnRight();
-    default:
-      throw new IllegalArgumentException("invalid: " + i);
-    }
-  }
-
-  /** Returns a join type similar to this but that does not generate nulls on
-   * the left. */
-  public JoinRelType cancelNullsOnLeft() {
-    switch (this) {
-    case RIGHT:
-      return INNER;
-    case FULL:
-      return LEFT;
-    default:
-      return this;
-    }
-  }
-
-  /** Returns a join type similar to this but that does not generate nulls on
-   * the right. */
-  public JoinRelType cancelNullsOnRight() {
-    switch (this) {
-    case LEFT:
-      return INNER;
-    case FULL:
-      return RIGHT;
-    default:
-      return this;
-    }
-  }
-
-  /** Transform this JoinRelType to CorrelateJoinType. **/
-  public CorrelateJoinType toLinq4j() {
-    switch (this) {
-    case INNER:
-      return CorrelateJoinType.INNER;
-    case LEFT:
-      return CorrelateJoinType.LEFT;
-    case SEMI:
-      return CorrelateJoinType.SEMI;
-    case ANTI:
-      return CorrelateJoinType.ANTI;
-    }
-    throw new IllegalStateException(
-        "Unable to convert " + this + " to CorrelateJoinType");
-  }
-
-  public boolean projectsRight() {
-    switch (this) {
-    case INNER:
-    case LEFT:
-    case RIGHT:
-    case FULL:
-      return true;
-    case SEMI:
-    case ANTI:
-      return false;
-    }
-    throw new IllegalStateException(
-        "Unable to convert " + this + " to JoinRelType");
-  }
-}
-
-// End JoinRelType.java
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
index cef2de0..d260514 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
@@ -79,7 +79,6 @@ import org.apache.calcite.rex.RexShuttle;
 import org.apache.calcite.rex.RexSubQuery;
 import org.apache.calcite.rex.RexUtil;
 import org.apache.calcite.rex.RexVisitorImpl;
-import org.apache.calcite.sql.SemiJoinType;
 import org.apache.calcite.sql.SqlExplainFormat;
 import org.apache.calcite.sql.SqlExplainLevel;
 import org.apache.calcite.sql.SqlFunction;
@@ -119,7 +118,7 @@ import java.util.TreeMap;
 
 /**
  *  This class is copied from Apache Calcite except that it supports SEMI/ANTI join.
- *  NOTES: This file should be deleted when upgrading to a new calcite version which contains CALCITE-2969.
+ *  NOTES: This file should be deleted when CALCITE-3169 and CALCITE-3170 are fixed.
  */
 
 /**
@@ -263,6 +262,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
       // has been rewritten; apply rules post-decorrelation
       final HepProgram program2 = HepProgram.builder()
           .addRuleInstance(
+              // use FilterJoinRule instead of FlinkFilterJoinRule while CALCITE-3170 is fixed
               new FlinkFilterJoinRule.FlinkFilterIntoJoinRule(
                   true, f,
                   FlinkFilterJoinRule.TRUE_PREDICATE))
@@ -474,8 +474,11 @@ public class RelDecorrelator implements ReflectiveVisitor {
     }
     final RelNode newInput = frame.r;
 
+    // aggregate outputs mapping: group keys and aggregates
+    final Map<Integer, Integer> outputMap = new HashMap<>();
+
     // map from newInput
-    Map<Integer, Integer> mapNewInputToProjOutputs = new HashMap<>();
+    final Map<Integer, Integer> mapNewInputToProjOutputs = new HashMap<>();
     final int oldGroupKeyCount = rel.getGroupSet().cardinality();
 
     // Project projects the original expressions,
@@ -497,6 +500,9 @@ public class RelDecorrelator implements ReflectiveVisitor {
         omittedConstants.put(i, constant);
         continue;
       }
+
+      // add mapping of group keys.
+      outputMap.put(i, newPos);
       int newInputPos = frame.oldToNewOutputs.get(i);
       projects.add(RexInputRef.of2(newInputPos, newInputOutput));
       mapNewInputToProjOutputs.put(newInputPos, newPos);
@@ -600,7 +606,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
 
       // The old to new output position mapping will be the same as that
       // of newProject, plus any aggregates that the oldAgg produces.
-      combinedMap.put(
+      outputMap.put(
           oldInputOutputFieldCount + i,
           newInputOutputFieldCount + i);
     }
@@ -612,15 +618,37 @@ public class RelDecorrelator implements ReflectiveVisitor {
       final List<RexNode> postProjects = new ArrayList<>(relBuilder.fields());
       for (Map.Entry<Integer, RexLiteral> entry
           : omittedConstants.descendingMap().entrySet()) {
-        postProjects.add(entry.getKey() + frame.corDefOutputs.size(),
-            entry.getValue());
+        int index = entry.getKey() + frame.corDefOutputs.size();
+        postProjects.add(index, entry.getValue());
+        // Shift the outputs whose index equals with or bigger than the added index
+        // with 1 offset.
+        shiftMapping(outputMap, index, 1);
+        // Then add the constant key mapping.
+        outputMap.put(entry.getKey(), index);
       }
       relBuilder.project(postProjects);
     }
 
     // Aggregate does not change input ordering so corVars will be
     // located at the same position as the input newProject.
-    return register(rel, relBuilder.build(), combinedMap, corDefOutputs);
+    return register(rel, relBuilder.build(), outputMap, corDefOutputs);
+  }
+
+  /**
+   * Shift the mapping to fixed offset from the {@code startIndex}.
+   * @param mapping    the original mapping
+   * @param startIndex any output whose index equals with or bigger than the starting index
+   *                   would be shift
+   * @param offset     shift offset
+   */
+  private static void shiftMapping(Map<Integer, Integer> mapping, int startIndex, int offset) {
+    for (Map.Entry<Integer, Integer> entry : mapping.entrySet()) {
+      if (entry.getValue() >= startIndex) {
+        mapping.put(entry.getKey(), entry.getValue() + offset);
+      } else {
+        mapping.put(entry.getKey(), entry.getValue());
+      }
+    }
   }
 
   public Frame getInvoke(RelNode r, RelNode parent) {
@@ -1162,7 +1190,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
         RexUtil.composeConjunction(relBuilder.getRexBuilder(), conditions);
     RelNode newJoin =
         LogicalJoin.create(leftFrame.r, rightFrame.r, condition,
-            ImmutableSet.of(), toJoinRelType(rel.getJoinType()));
+            ImmutableSet.of(), rel.getJoinType());
 
     return register(rel, newJoin, mapOldToNewOutputs, corDefOutputs);
   }
@@ -1173,6 +1201,14 @@ public class RelDecorrelator implements ReflectiveVisitor {
    * @param rel Join
    */
   public Frame decorrelateRel(LogicalJoin rel) {
+    // For SEMI/ANTI join decorrelate it's input directly,
+    // because the correlate variables can only be propagated from
+    // the left side, which is not supported yet.
+    if (!rel.getJoinType().projectsRight()) {
+      // fix CALCITE-3169
+      return decorrelateRel((RelNode) rel);
+    }
+
     //
     // Rewrite logic:
     //
@@ -1180,10 +1216,6 @@ public class RelDecorrelator implements ReflectiveVisitor {
     // 2. map output positions and produce corVars if any.
     //
 
-    if (!rel.getJoinType().projectsRight()) {
-      return decorrelateRel((RelNode) rel);
-    }
-
     final RelNode oldLeft = rel.getInput(0);
     final RelNode oldRight = rel.getInput(1);
 
@@ -1335,21 +1367,6 @@ public class RelDecorrelator implements ReflectiveVisitor {
         .build();
   }
 
-  private JoinRelType toJoinRelType(SemiJoinType semiJoinType) {
-    switch (semiJoinType) {
-      case INNER:
-        return JoinRelType.INNER;
-      case LEFT:
-        return JoinRelType.LEFT;
-      case SEMI:
-        return JoinRelType.SEMI;
-      case ANTI:
-        return JoinRelType.ANTI;
-      default:
-        throw new IllegalArgumentException("Unsupported type: " + semiJoinType);
-    }
-  }
-
   /**
    * Pulls a {@link Project} above a {@link Correlate} from its RHS input.
    * Enforces nullability for join output.
@@ -1365,7 +1382,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
       LogicalProject project,
       Set<Integer> isCount) {
     final RelNode left = correlate.getLeft();
-    final JoinRelType joinType = toJoinRelType(correlate.getJoinType());
+    final JoinRelType joinType = correlate.getJoinType();
 
     // now create the new project
     final List<Pair<RexNode, String>> newProjects = new ArrayList<>();
@@ -1867,7 +1884,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
       //   Aggregate (groupby (0) single_value())
       //     Project-A (may reference corVar)
       //       rightInput
-      final JoinRelType joinType = toJoinRelType(correlate.getJoinType());
+      final JoinRelType joinType = correlate.getJoinType();
 
       // corRel.getCondition was here, however Correlate was updated so it
       // never includes a join condition. The code was not modified for brevity.
@@ -2077,7 +2094,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
         return;
       }
 
-      final JoinRelType joinType = toJoinRelType(correlate.getJoinType());
+      final JoinRelType joinType = correlate.getJoinType();
       // corRel.getCondition was here, however Correlate was updated so it
       // never includes a join condition. The code was not modified for brevity.
       RexNode joinCond = rexBuilder.makeLiteral(true);
@@ -2482,7 +2499,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
         return;
       }
 
-      JoinRelType joinType = toJoinRelType(correlate.getJoinType());
+      JoinRelType joinType = correlate.getJoinType();
       // corRel.getCondition was here, however Correlate was updated so it
       // never includes a join condition. The code was not modified for brevity.
       RexNode joinCond = relBuilder.literal(true);
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/FunctionCatalogOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/FunctionCatalogOperatorTable.java
index bd8306f..dc5b918 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/FunctionCatalogOperatorTable.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/FunctionCatalogOperatorTable.java
@@ -33,6 +33,7 @@ import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.SqlOperatorTable;
 import org.apache.calcite.sql.SqlSyntax;
+import org.apache.calcite.sql.validate.SqlNameMatcher;
 
 import java.util.List;
 import java.util.Optional;
@@ -58,7 +59,8 @@ public class FunctionCatalogOperatorTable implements SqlOperatorTable {
 			SqlIdentifier opName,
 			SqlFunctionCategory category,
 			SqlSyntax syntax,
-			List<SqlOperator> operatorList) {
+			List<SqlOperator> operatorList,
+			SqlNameMatcher nameMatcher) {
 		if (!opName.isSimple()) {
 			return;
 		}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/FlinkSqlOperatorTable.java
index 12af928..d3d2e5a 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/FlinkSqlOperatorTable.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/FlinkSqlOperatorTable.java
@@ -29,9 +29,11 @@ import org.apache.calcite.sql.SqlBinaryOperator;
 import org.apache.calcite.sql.SqlFunction;
 import org.apache.calcite.sql.SqlFunctionCategory;
 import org.apache.calcite.sql.SqlGroupedWindowFunction;
+import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.SqlOperatorBinding;
+import org.apache.calcite.sql.SqlSyntax;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql.type.InferTypes;
 import org.apache.calcite.sql.type.OperandTypes;
@@ -42,6 +44,8 @@ import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.sql.type.SqlTypeTransforms;
 import org.apache.calcite.sql.util.ReflectiveSqlOperatorTable;
 import org.apache.calcite.sql.validate.SqlMonotonicity;
+import org.apache.calcite.sql.validate.SqlNameMatcher;
+import org.apache.calcite.sql.validate.SqlNameMatchers;
 
 import java.util.Arrays;
 import java.util.List;
@@ -89,6 +93,17 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 		});
 	}
 
+	@Override
+	public void lookupOperatorOverloads(
+			SqlIdentifier opName,
+			SqlFunctionCategory category,
+			SqlSyntax syntax,
+			List<SqlOperator> operatorList,
+			SqlNameMatcher nameMatcher) {
+		// set caseSensitive=false to make sure the behavior is same with before.
+		super.lookupOperatorOverloads(opName, category, syntax, operatorList, SqlNameMatchers.withCaseSensitive(false));
+	}
+
 	// -----------------------------------------------------------------------------
 	// Flink specific built-in scalar SQL functions
 	// -----------------------------------------------------------------------------
@@ -1131,7 +1146,6 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 	public static final SqlFunction CURRENT_TIMESTAMP = SqlStdOperatorTable.CURRENT_TIMESTAMP;
 	public static final SqlFunction CURRENT_DATE = SqlStdOperatorTable.CURRENT_DATE;
 	public static final SqlFunction CAST = SqlStdOperatorTable.CAST;
-	public static final SqlFunction QUARTER = SqlStdOperatorTable.QUARTER;
 	public static final SqlOperator SCALAR_QUERY = SqlStdOperatorTable.SCALAR_QUERY;
 	public static final SqlOperator EXISTS = SqlStdOperatorTable.EXISTS;
 	public static final SqlFunction SIN = SqlStdOperatorTable.SIN;
@@ -1148,9 +1162,21 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 	public static final SqlFunction PI = SqlStdOperatorTable.PI;
 	public static final SqlFunction RAND = SqlStdOperatorTable.RAND;
 	public static final SqlFunction RAND_INTEGER = SqlStdOperatorTable.RAND_INTEGER;
+	public static final SqlFunction TRUNCATE = SqlStdOperatorTable.TRUNCATE;
+
+	// TIME FUNCTIONS
+	public static final SqlFunction YEAR = SqlStdOperatorTable.YEAR;
+	public static final SqlFunction QUARTER = SqlStdOperatorTable.QUARTER;
+	public static final SqlFunction MONTH = SqlStdOperatorTable.MONTH;
+	public static final SqlFunction WEEK = SqlStdOperatorTable.WEEK;
+	public static final SqlFunction HOUR = SqlStdOperatorTable.HOUR;
+	public static final SqlFunction MINUTE = SqlStdOperatorTable.MINUTE;
+	public static final SqlFunction SECOND = SqlStdOperatorTable.SECOND;
+	public static final SqlFunction DAYOFYEAR = SqlStdOperatorTable.DAYOFYEAR;
+	public static final SqlFunction DAYOFMONTH = SqlStdOperatorTable.DAYOFMONTH;
+	public static final SqlFunction DAYOFWEEK = SqlStdOperatorTable.DAYOFWEEK;
 	public static final SqlFunction TIMESTAMP_ADD = SqlStdOperatorTable.TIMESTAMP_ADD;
 	public static final SqlFunction TIMESTAMP_DIFF = SqlStdOperatorTable.TIMESTAMP_DIFF;
-	public static final SqlFunction TRUNCATE = SqlStdOperatorTable.TRUNCATE;
 
 	// MATCH_RECOGNIZE
 	public static final SqlFunction FIRST = SqlStdOperatorTable.FIRST;
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/FlinkFilterJoinRule.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/FlinkFilterJoinRule.java
index 3b593de..3513ba7 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/FlinkFilterJoinRule.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/FlinkFilterJoinRule.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.table.plan.rules.logical;
 
-import org.apache.flink.table.plan.util.FlinkRelOptUtil;
-
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelOptRuleOperand;
@@ -42,12 +40,10 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
 
-import static org.apache.calcite.plan.RelOptUtil.conjunctions;
-
 /**
  * This rules is copied from Calcite's {@link org.apache.calcite.rel.rules.FilterJoinRule}.
+ * NOTES: This file should be deleted when CALCITE-3170 is fixed.
  * Modification:
- * - Use `FlinkRelOptUtil.classifyFilters` to support SEMI/ANTI join
  * - Handles the ON condition of anti-join can not be pushed down
  */
 
@@ -140,7 +136,7 @@ public abstract class FlinkFilterJoinRule extends RelOptRule {
 
 		final List<RexNode> aboveFilters =
 				filter != null
-						? conjunctions(filter.getCondition())
+						? RelOptUtil.conjunctions(filter.getCondition())
 						: new ArrayList<>();
 		final com.google.common.collect.ImmutableList<RexNode> origAboveFilters =
 				com.google.common.collect.ImmutableList.copyOf(aboveFilters);
@@ -150,7 +146,7 @@ public abstract class FlinkFilterJoinRule extends RelOptRule {
 		if (smart
 				&& !origAboveFilters.isEmpty()
 				&& join.getJoinType() != JoinRelType.INNER) {
-			joinType = FlinkRelOptUtil.simplifyJoin(join, origAboveFilters, joinType);
+			joinType = RelOptUtil.simplifyJoin(join, origAboveFilters, joinType);
 		}
 
 		final List<RexNode> leftFilters = new ArrayList<>();
@@ -166,7 +162,7 @@ public abstract class FlinkFilterJoinRule extends RelOptRule {
 		// filters. They can be pushed down if they are not on the NULL
 		// generating side.
 		boolean filterPushed = false;
-		if (FlinkRelOptUtil.classifyFilters(
+		if (RelOptUtil.classifyFilters(
 				join,
 				aboveFilters,
 				joinType,
@@ -198,7 +194,7 @@ public abstract class FlinkFilterJoinRule extends RelOptRule {
 		// pushed down if it does not affect the non-matching set, i.e. it is
 		// not on the side which is preserved.
 		// A ON clause filter of anti-join can not be pushed down.
-		if (!isAntiJoin && FlinkRelOptUtil.classifyFilters(
+		if (!isAntiJoin && RelOptUtil.classifyFilters(
 				join,
 				joinFilters,
 				joinType,
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/SubQueryDecorrelator.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/SubQueryDecorrelator.java
index a404e3f..2847a41 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/SubQueryDecorrelator.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/SubQueryDecorrelator.java
@@ -29,7 +29,6 @@ import org.apache.calcite.plan.hep.HepRelVertex;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelShuttleImpl;
-import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.core.Project;
@@ -563,10 +562,6 @@ public class SubQueryDecorrelator extends RelShuttleImpl {
 		 * @param rel Aggregate to rewrite
 		 */
 		public Frame decorrelateRel(LogicalAggregate rel) {
-			if (rel.getGroupType() != Aggregate.Group.SIMPLE) {
-				throw new AssertionError(Bug.CALCITE_461_FIXED);
-			}
-
 			// Aggregate itself should not reference corVars.
 			assert !cm.mapRefRelToCorRef.containsKey(rel);
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkLogicalRelFactories.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkLogicalRelFactories.scala
index c60bb12..11b0681 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkLogicalRelFactories.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkLogicalRelFactories.scala
@@ -32,8 +32,8 @@ import org.apache.calcite.rel.core._
 import org.apache.calcite.rel.logical._
 import org.apache.calcite.rel.{RelCollation, RelNode}
 import org.apache.calcite.rex._
+import org.apache.calcite.sql.SqlKind
 import org.apache.calcite.sql.SqlKind.{EXCEPT, INTERSECT, UNION}
-import org.apache.calcite.sql.{SemiJoinType, SqlKind}
 import org.apache.calcite.tools.{RelBuilder, RelBuilderFactory}
 import org.apache.calcite.util.ImmutableBitSet
 
@@ -139,14 +139,12 @@ object FlinkLogicalRelFactories {
     * Implementation of [[AggregateFactory]] that returns a [[FlinkLogicalAggregate]].
     */
   class AggregateFactoryImpl extends AggregateFactory {
-    @SuppressWarnings(Array("deprecation"))
     def createAggregate(
         input: RelNode,
-        indicator: Boolean,
         groupSet: ImmutableBitSet,
         groupSets: ImmutableList[ImmutableBitSet],
         aggCalls: util.List[AggregateCall]): RelNode = {
-      FlinkLogicalAggregate.create(input, indicator, groupSet, groupSets, aggCalls)
+      FlinkLogicalAggregate.create(input, groupSet, groupSets, aggCalls)
     }
   }
 
@@ -206,7 +204,7 @@ object FlinkLogicalRelFactories {
         right: RelNode,
         correlationId: CorrelationId,
         requiredColumns: ImmutableBitSet,
-        joinType: SemiJoinType): RelNode = {
+        joinType: JoinRelType): RelNode = {
       FlinkLogicalCorrelate.create(left, right, correlationId, requiredColumns, joinType)
     }
   }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CorrelateCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CorrelateCodeGenerator.scala
index 7091846..89f0a4c 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CorrelateCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CorrelateCodeGenerator.scala
@@ -42,8 +42,8 @@ import org.apache.flink.table.types.{DataType, PlannerTypeUtils}
 import org.apache.flink.table.typeutils.BaseRowTypeInfo
 
 import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
 import org.apache.calcite.rex._
-import org.apache.calcite.sql.SemiJoinType
 
 import scala.collection.JavaConversions._
 
@@ -58,7 +58,7 @@ object CorrelateCodeGenerator {
       scan: FlinkLogicalTableFunctionScan,
       condition: Option[RexNode],
       outDataType: RelDataType,
-      joinType: SemiJoinType,
+      joinType: JoinRelType,
       parallelism: Int,
       retainHeader: Boolean,
       expression: (RexNode, List[String], Option[List[RexNode]]) => String,
@@ -153,7 +153,7 @@ object CorrelateCodeGenerator {
       swallowInputOnly: Boolean = false,
       udtfType: LogicalType,
       returnType: RowType,
-      joinType: SemiJoinType,
+      joinType: JoinRelType,
       rexCall: RexCall,
       pojoFieldMapping: Option[Array[Int]],
       ruleDescription: String,
@@ -192,7 +192,7 @@ object CorrelateCodeGenerator {
          |""".stripMargin
 
     // 3. left join
-    if (joinType == SemiJoinType.LEFT) {
+    if (joinType == JoinRelType.LEFT) {
       if (swallowInputOnly) {
         // and the returned row table function is empty, collect a null
         val nullRowTerm = CodeGenUtils.newName("nullRow")
@@ -267,8 +267,8 @@ object CorrelateCodeGenerator {
              |""".stripMargin
 
         }
-    } else if (joinType != SemiJoinType.INNER) {
-      throw new TableException(s"Unsupported SemiJoinType: $joinType for correlate join.")
+    } else if (joinType != JoinRelType.INNER) {
+      throw new TableException(s"Unsupported JoinRelType: $joinType for correlate join.")
     }
 
     val genOperator = OperatorCodeGenerator.generateOneInputStreamOperator[BaseRow, BaseRow](
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnUniqueness.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnUniqueness.scala
index 7993d67..0e5dcd6 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnUniqueness.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnUniqueness.scala
@@ -40,7 +40,6 @@ import org.apache.calcite.rel.core._
 import org.apache.calcite.rel.metadata._
 import org.apache.calcite.rel.{RelNode, SingleRel}
 import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
-import org.apache.calcite.sql.SemiJoinType._
 import org.apache.calcite.sql.SqlKind
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.util.{Bug, BuiltInMethod, ImmutableBitSet, Util}
@@ -583,8 +582,9 @@ class FlinkRelMdColumnUniqueness private extends MetadataHandler[BuiltInMetadata
       columns: ImmutableBitSet,
       ignoreNulls: Boolean): JBoolean = {
     rel.getJoinType match {
-      case ANTI | SEMI => mq.areColumnsUnique(rel.getLeft, columns, ignoreNulls)
-      case LEFT | INNER =>
+      case JoinRelType.ANTI | JoinRelType.SEMI =>
+        mq.areColumnsUnique(rel.getLeft, columns, ignoreNulls)
+      case JoinRelType.LEFT | JoinRelType.INNER =>
         val left = rel.getLeft
         val right = rel.getRight
         val leftFieldCount = left.getRowType.getFieldCount
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalWindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalWindowAggregate.scala
index be8288e..b96faf4 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalWindowAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalWindowAggregate.scala
@@ -42,7 +42,6 @@ final class LogicalWindowAggregate(
   override def copy(
       traitSet: RelTraitSet,
       input: RelNode,
-      indicator: Boolean,
       groupSet: ImmutableBitSet,
       groupSets: util.List[ImmutableBitSet],
       aggCalls: util.List[AggregateCall]): Aggregate = {
@@ -74,7 +73,7 @@ object LogicalWindowAggregate {
       window: LogicalWindow,
       namedProperties: Seq[PlannerNamedWindowProperty],
       agg: Aggregate): LogicalWindowAggregate = {
-    require(!agg.indicator && (agg.getGroupType == Group.SIMPLE))
+    require(agg.getGroupType == Group.SIMPLE)
     val cluster: RelOptCluster = agg.getCluster
     val traitSet: RelTraitSet = cluster.traitSetOf(Convention.NONE)
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/WindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/WindowAggregate.scala
index e206b65..433590b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/WindowAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/WindowAggregate.scala
@@ -48,7 +48,6 @@ abstract class WindowAggregate(
     cluster,
     traitSet,
     child,
-    false,
     groupSet,
     ImmutableList.of(groupSet),
     aggCalls) {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/common/CommonPhysicalJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/common/CommonPhysicalJoin.scala
index 65fec34..4a0c303 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/common/CommonPhysicalJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/common/CommonPhysicalJoin.scala
@@ -64,11 +64,10 @@ abstract class CommonPhysicalJoin(
   lazy val inputRowType: RelDataType = joinType match {
     case JoinRelType.SEMI | JoinRelType.ANTI =>
       // Combines inputs' RowType, the result is different from SEMI/ANTI Join's RowType.
-      SqlValidatorUtil.deriveJoinRowType(
+      SqlValidatorUtil.createJoinType(
+        getCluster.getTypeFactory,
         getLeft.getRowType,
         getRight.getRowType,
-        getJoinType,
-        getCluster.getTypeFactory,
         null,
         Collections.emptyList[RelDataTypeField]
       )
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalAggregate.scala
index 70fc662..52d3c41 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalAggregate.scala
@@ -43,13 +43,12 @@ class FlinkLogicalAggregate(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     child: RelNode,
-    indicator: Boolean,
     groupSet: ImmutableBitSet,
     groupSets: util.List[ImmutableBitSet],
     aggCalls: util.List[AggregateCall],
     /* flag indicating whether to skip SplitAggregateRule */
     var partialFinalType: PartialFinalType = PartialFinalType.NONE)
-  extends Aggregate(cluster, traitSet, child, indicator, groupSet, groupSets, aggCalls)
+  extends Aggregate(cluster, traitSet, child, groupSet, groupSets, aggCalls)
   with FlinkLogicalRel {
 
   def setPartialFinalType(partialFinalType: PartialFinalType): Unit = {
@@ -59,12 +58,11 @@ class FlinkLogicalAggregate(
   override def copy(
       traitSet: RelTraitSet,
       input: RelNode,
-      indicator: Boolean,
       groupSet: ImmutableBitSet,
       groupSets: util.List[ImmutableBitSet],
       aggCalls: util.List[AggregateCall]): Aggregate = {
     new FlinkLogicalAggregate(
-      cluster, traitSet, input, indicator, groupSet, groupSets, aggCalls, partialFinalType)
+      cluster, traitSet, input, groupSet, groupSets, aggCalls, partialFinalType)
   }
 
   override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = {
@@ -113,7 +111,6 @@ private class FlinkLogicalAggregateBatchConverter
     val newInput = RelOptRule.convert(agg.getInput, FlinkConventions.LOGICAL)
     FlinkLogicalAggregate.create(
       newInput,
-      agg.indicator,
       agg.getGroupSet,
       agg.getGroupSets,
       agg.getAggCallList)
@@ -143,7 +140,6 @@ private class FlinkLogicalAggregateStreamConverter
     val newInput = RelOptRule.convert(agg.getInput, FlinkConventions.LOGICAL)
     FlinkLogicalAggregate.create(
       newInput,
-      agg.indicator,
       agg.getGroupSet,
       agg.getGroupSets,
       agg.getAggCallList)
@@ -156,12 +152,11 @@ object FlinkLogicalAggregate {
 
   def create(
       input: RelNode,
-      indicator: Boolean,
       groupSet: ImmutableBitSet,
       groupSets: util.List[ImmutableBitSet],
       aggCalls: util.List[AggregateCall]): FlinkLogicalAggregate = {
     val cluster = input.getCluster
     val traitSet = cluster.traitSetOf(FlinkConventions.LOGICAL).simplify()
-    new FlinkLogicalAggregate(cluster,traitSet, input, indicator, groupSet, groupSets, aggCalls)
+    new FlinkLogicalAggregate(cluster,traitSet, input, groupSet, groupSets, aggCalls)
   }
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCorrelate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCorrelate.scala
index 54bf96c..3c06542 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCorrelate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCorrelate.scala
@@ -23,9 +23,8 @@ import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.calcite.plan.{Convention, RelOptCluster, RelOptRule, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.core.{Correlate, CorrelationId}
+import org.apache.calcite.rel.core.{Correlate, CorrelationId, JoinRelType}
 import org.apache.calcite.rel.logical.LogicalCorrelate
-import org.apache.calcite.sql.SemiJoinType
 import org.apache.calcite.util.ImmutableBitSet
 
 /**
@@ -39,7 +38,7 @@ class FlinkLogicalCorrelate(
     right: RelNode,
     correlationId: CorrelationId,
     requiredColumns: ImmutableBitSet,
-    joinType: SemiJoinType)
+    joinType: JoinRelType)
   extends Correlate(cluster, traitSet, left, right, correlationId, requiredColumns, joinType)
   with FlinkLogicalRel {
 
@@ -49,7 +48,7 @@ class FlinkLogicalCorrelate(
       right: RelNode,
       correlationId: CorrelationId,
       requiredColumns: ImmutableBitSet,
-      joinType: SemiJoinType): Correlate = {
+      joinType: JoinRelType): Correlate = {
 
     new FlinkLogicalCorrelate(
       cluster,
@@ -91,7 +90,7 @@ object FlinkLogicalCorrelate {
       right: RelNode,
       correlationId: CorrelationId,
       requiredColumns: ImmutableBitSet,
-      joinType: SemiJoinType): FlinkLogicalCorrelate = {
+      joinType: JoinRelType): FlinkLogicalCorrelate = {
     val cluster = left.getCluster
     val traitSet = cluster.traitSetOf(FlinkConventions.LOGICAL).simplify()
     new FlinkLogicalCorrelate(
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
index 2b96e44..41074bf 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
@@ -18,20 +18,20 @@
 
 package org.apache.flink.table.plan.nodes.logical
 
+import org.apache.flink.table.functions.TemporalTableFunction
+import org.apache.flink.table.functions.utils.TableSqlFunction
 import org.apache.flink.table.plan.nodes.FlinkConventions
+
 import com.google.common.collect.ImmutableList
 import org.apache.calcite.plan.{Convention, RelOptCluster, RelOptRuleCall, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.core.TableFunctionScan
+import org.apache.calcite.rel.core.{JoinRelType, TableFunctionScan}
 import org.apache.calcite.rel.logical.LogicalTableFunctionScan
 import org.apache.calcite.rel.metadata.RelColumnMapping
 import org.apache.calcite.rex.{RexCall, RexLiteral, RexNode, RexUtil}
-import org.apache.calcite.sql.SemiJoinType
 import org.apache.calcite.util.ImmutableBitSet
-import org.apache.flink.table.functions.TemporalTableFunction
-import org.apache.flink.table.functions.utils.TableSqlFunction
 
 import java.lang.reflect.Type
 import java.util
@@ -162,7 +162,7 @@ class FlinkLogicalTableFunctionScanConverter
       newScan,
       cluster.createCorrel(), // a dummy CorrelationId
       ImmutableBitSet.of(),
-      SemiJoinType.INNER)
+      JoinRelType.INNER)
   }
 }
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
index ebea49c..383cdd1 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
@@ -50,7 +50,6 @@ class FlinkLogicalWindowAggregate(
   override def copy(
       traitSet: RelTraitSet,
       input: RelNode,
-      indicator: Boolean,
       groupSet: ImmutableBitSet,
       groupSets: util.List[ImmutableBitSet],
       aggCalls: util.List[AggregateCall]): Aggregate = {
@@ -99,7 +98,7 @@ class FlinkLogicalWindowAggregateConverter
 
   override def convert(rel: RelNode): RelNode = {
     val agg = rel.asInstanceOf[LogicalWindowAggregate]
-    require(!agg.indicator && (agg.getGroupType == Group.SIMPLE))
+    require(agg.getGroupType == Group.SIMPLE)
     val newInput = RelOptRule.convert(agg.getInput, FlinkConventions.LOGICAL)
     val traitSet = newInput.getCluster.traitSet().replace(FlinkConventions.LOGICAL).simplify()
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecCorrelate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecCorrelate.scala
index cd792fa..8e3c759 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecCorrelate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecCorrelate.scala
@@ -30,10 +30,10 @@ import org.apache.flink.table.planner.BatchPlanner
 
 import org.apache.calcite.plan.{RelOptCluster, RelOptRule, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.Correlate
+import org.apache.calcite.rel.core.{Correlate, JoinRelType}
 import org.apache.calcite.rel.{RelCollationTraitDef, RelDistribution, RelFieldCollation, RelNode, RelWriter, SingleRel}
 import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode, RexProgram}
-import org.apache.calcite.sql.{SemiJoinType, SqlKind}
+import org.apache.calcite.sql.SqlKind
 import org.apache.calcite.util.mapping.{Mapping, MappingType, Mappings}
 
 import java.util
@@ -51,12 +51,12 @@ class BatchExecCorrelate(
     condition: Option[RexNode],
     projectProgram: Option[RexProgram],
     outputRowType: RelDataType,
-    joinType: SemiJoinType)
+    joinType: JoinRelType)
   extends SingleRel(cluster, traitSet, inputRel)
   with BatchPhysicalRel
   with BatchExecNode[BaseRow] {
 
-  require(joinType == SemiJoinType.INNER || joinType == SemiJoinType.LEFT)
+  require(joinType == JoinRelType.INNER || joinType == JoinRelType.LEFT)
 
   override def deriveRowType(): RelDataType = outputRowType
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecCorrelate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecCorrelate.scala
index 9b28572..d3324ac 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecCorrelate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecCorrelate.scala
@@ -29,9 +29,9 @@ import org.apache.flink.table.runtime.AbstractProcessStreamOperator
 
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
 import org.apache.calcite.rex.{RexCall, RexNode, RexProgram}
-import org.apache.calcite.sql.SemiJoinType
 
 import java.util
 
@@ -48,12 +48,12 @@ class StreamExecCorrelate(
     scan: FlinkLogicalTableFunctionScan,
     condition: Option[RexNode],
     outputRowType: RelDataType,
-    joinType: SemiJoinType)
+    joinType: JoinRelType)
   extends SingleRel(cluster, traitSet, inputRel)
   with StreamPhysicalRel
   with StreamExecNode[BaseRow] {
 
-  require(joinType == SemiJoinType.INNER || joinType == SemiJoinType.LEFT)
+  require(joinType == JoinRelType.INNER || joinType == JoinRelType.LEFT)
 
   override def producesUpdates: Boolean = false
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala
index 96da7b4..edc15e3 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala
@@ -74,7 +74,6 @@ object FlinkBatchProgram {
     )
 
     // rewrite special temporal join plan
-    // TODO remove this program after upgraded to CALCITE-1.20.0 (CALCITE-2004 is fixed)
     chainedProgram.addLast(
       TEMPORAL_JOIN_REWRITE,
       FlinkGroupProgramBuilder.newBuilder[BatchOptimizeContext]
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkStreamProgram.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkStreamProgram.scala
index 1020a69..7fac4cd 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkStreamProgram.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkStreamProgram.scala
@@ -74,7 +74,6 @@ object FlinkStreamProgram {
         .build())
 
     // rewrite special temporal join plan
-    // TODO remove this program after upgraded to CALCITE-1.20.0 (CALCITE-2004 is fixed)
     chainedProgram.addLast(
       TEMPORAL_JOIN_REWRITE,
       FlinkGroupProgramBuilder.newBuilder[StreamOptimizeContext]
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
index 27fc011..38f677e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
@@ -17,10 +17,11 @@
  */
 package org.apache.flink.table.plan.rules.logical
 
-import org.apache.calcite.plan.RelOptRule.{any, operand, some}
+import org.apache.calcite.plan.RelOptRule.{any, operand}
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.logical.{LogicalCorrelate, LogicalFilter, LogicalSnapshot}
+import org.apache.calcite.rex.{RexCorrelVariable, RexFieldAccess, RexInputRef, RexNode, RexShuttle}
 
 /**
   * The initial temporal table join (FOR SYSTEM_TIME AS OF) is a Correlate, rewrite it into a Join
@@ -28,29 +29,51 @@ import org.apache.calcite.rel.logical.{LogicalCorrelate, LogicalFilter, LogicalS
   * [[org.apache.flink.table.plan.nodes.physical.stream.StreamExecLookupJoin]] in physical and
   * might be translated into
   * [[org.apache.flink.table.plan.nodes.physical.stream.StreamExecTemporalJoin]] in the future.
+  *
+  * TODO supports `true` join condition
   */
 class LogicalCorrelateToJoinFromTemporalTableRule
   extends RelOptRule(
-    operand(classOf[LogicalFilter],
-      operand(classOf[LogicalCorrelate], some(
-        operand(classOf[RelNode], any()),
-        operand(classOf[LogicalSnapshot], any())))),
+    operand(classOf[LogicalCorrelate],
+      operand(classOf[RelNode], any()),
+      operand(classOf[LogicalFilter],
+        operand(classOf[LogicalSnapshot], any()))),
     "LogicalCorrelateToJoinFromTemporalTableRule") {
 
   override def onMatch(call: RelOptRuleCall): Unit = {
-    val filterOnCorrelate: LogicalFilter = call.rel(0)
-    val correlate: LogicalCorrelate = call.rel(1)
-    val leftNode: RelNode = call.rel(2)
+    val correlate: LogicalCorrelate = call.rel(0)
+    val leftInput: RelNode = call.rel(1)
+    val filter: LogicalFilter = call.rel(2)
     val snapshot: LogicalSnapshot = call.rel[LogicalSnapshot](3)
 
+    val leftRowType = leftInput.getRowType
+    val condition = filter.getCondition
+    val joinCondition = condition.accept(new RexShuttle() {
+      // change correlate variable expression to normal RexInputRef (which is from left side)
+      override def visitFieldAccess(fieldAccess: RexFieldAccess): RexNode = {
+        fieldAccess.getReferenceExpr match {
+          case corVar: RexCorrelVariable =>
+            require(correlate.getCorrelationId.equals(corVar.id))
+            val index = leftRowType.getFieldList.indexOf(fieldAccess.getField)
+            RexInputRef.of(index, leftRowType)
+          case _ => super.visitFieldAccess(fieldAccess)
+        }
+      }
+
+      // update the field index from right side
+      override def visitInputRef(inputRef: RexInputRef): RexNode = {
+        val rightIndex = leftRowType.getFieldCount + inputRef.getIndex
+        new RexInputRef(rightIndex, inputRef.getType)
+      }
+    })
+
     val builder = call.builder()
-    builder.push(leftNode)
+    builder.push(leftInput)
     builder.push(snapshot)
-    builder.join(
-      correlate.getJoinType.toJoinType,
-      filterOnCorrelate.getCondition)
+    builder.join(correlate.getJoinType, joinCondition)
 
-    call.transformTo(builder.build())
+    val rel = builder.build()
+    call.transformTo(rel)
   }
 
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecCorrelateRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecCorrelateRule.scala
index 5b6456b..cf748f1 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecCorrelateRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecCorrelateRule.scala
@@ -36,8 +36,8 @@ class StreamExecCorrelateRule
     "StreamExecCorrelateRule") {
 
   override def matches(call: RelOptRuleCall): Boolean = {
-    val join: FlinkLogicalCorrelate = call.rel(0).asInstanceOf[FlinkLogicalCorrelate]
-    val right = join.getRight.asInstanceOf[RelSubset].getOriginal
+    val correlate: FlinkLogicalCorrelate = call.rel(0)
+    val right = correlate.getRight.asInstanceOf[RelSubset].getOriginal
 
     // find only calc and table function
     def findTableFunction(calc: FlinkLogicalCalc): Boolean = {
@@ -59,10 +59,11 @@ class StreamExecCorrelateRule
   }
 
   override def convert(rel: RelNode): RelNode = {
-    val join: FlinkLogicalCorrelate = rel.asInstanceOf[FlinkLogicalCorrelate]
+    val correlate = rel.asInstanceOf[FlinkLogicalCorrelate]
     val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
-    val convInput: RelNode = RelOptRule.convert(join.getInput(0), FlinkConventions.STREAM_PHYSICAL)
-    val right: RelNode = join.getInput(1)
+    val convInput: RelNode = RelOptRule.convert(
+      correlate.getInput(0), FlinkConventions.STREAM_PHYSICAL)
+    val right: RelNode = correlate.getInput(1)
 
 
     def getTableScan(calc: FlinkLogicalCalc): RelNode = {
@@ -115,7 +116,7 @@ class StreamExecCorrelateRule
             scan,
             condition,
             rel.getRowType,
-            join.getJoinType)
+            correlate.getJoinType)
       }
     }
     convertToCorrelate(right, None)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelOptUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelOptUtil.scala
index 1ac0f30..de71cfc 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelOptUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelOptUtil.scala
@@ -23,10 +23,9 @@ import org.apache.flink.table.plan.`trait`.{MiniBatchInterval, MiniBatchMode}
 import org.apache.flink.table.plan.metadata.SelectivityEstimator
 import org.apache.flink.table.{JBoolean, JByte, JDouble, JFloat, JLong, JShort}
 
-import com.google.common.collect.{ImmutableList, Lists}
+import com.google.common.collect.Lists
 import org.apache.calcite.config.NullCollation
-import org.apache.calcite.plan.RelOptUtil.InputFinder
-import org.apache.calcite.plan.{RelOptUtil, Strong}
+import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.rel.RelFieldCollation.{Direction, NullDirection}
 import org.apache.calcite.rel.`type`.RelDataTypeField
 import org.apache.calcite.rel.core.{Join, JoinRelType}
@@ -189,196 +188,9 @@ object FlinkRelOptUtil {
   }
 
   /**
-    * Simplifies outer joins if filter above would reject nulls.
-    *
-    * NOTES: This method should be deleted when upgrading to a new calcite version
-    * which contains CALCITE-2969.
-    *
-    * @param joinRel Join
-    * @param aboveFilters Filters from above
-    * @param joinType Join type, can not be inner join
-    */
-  def simplifyJoin(
-      joinRel: RelNode,
-      aboveFilters: ImmutableList[RexNode],
-      joinType: JoinRelType): JoinRelType = {
-    // No need to simplify if only first input output.
-    if (!joinType.projectsRight()) {
-      return joinType
-    }
-    val nTotalFields = joinRel.getRowType.getFieldCount
-    val nSysFields = 0
-    val nFieldsLeft = joinRel.getInputs.get(0).getRowType.getFieldCount
-    val nFieldsRight = joinRel.getInputs.get(1).getRowType.getFieldCount
-    assert(nTotalFields == nSysFields + nFieldsLeft + nFieldsRight)
-
-    // set the reference bitmaps for the left and right children
-    val leftBitmap = ImmutableBitSet.range(nSysFields, nSysFields + nFieldsLeft)
-    val rightBitmap = ImmutableBitSet.range(nSysFields + nFieldsLeft, nTotalFields)
-
-    var result = joinType
-    for (filter <- aboveFilters) {
-      if (joinType.generatesNullsOnLeft && Strong.isNotTrue(filter, leftBitmap)) {
-        result = result.cancelNullsOnLeft
-      }
-      if (joinType.generatesNullsOnRight && Strong.isNotTrue(filter, rightBitmap)) {
-        result = result.cancelNullsOnRight
-      }
-      if (joinType eq JoinRelType.INNER) {
-        return result
-      }
-    }
-    result
-  }
-
-  /**
-    * Classifies filters according to where they should be processed. They
-    * either stay where they are, are pushed to the join (if they originated
-    * from above the join), or are pushed to one of the children. Filters that
-    * are pushed are added to list passed in as input parameters.
-    *
-    * NOTES: This method should be deleted when upgrading to a new calcite version
-    * which contains CALCITE-2969.
-    *
-    * @param joinRel      join node
-    * @param filters      filters to be classified
-    * @param joinType     join type
-    * @param pushInto     whether filters can be pushed into the ON clause
-    * @param pushLeft     true if filters can be pushed to the left
-    * @param pushRight    true if filters can be pushed to the right
-    * @param joinFilters  list of filters to push to the join
-    * @param leftFilters  list of filters to push to the left child
-    * @param rightFilters list of filters to push to the right child
-    * @return whether at least one filter was pushed
-    */
-  def classifyFilters(
-      joinRel: RelNode,
-      filters: util.List[RexNode],
-      joinType: JoinRelType,
-      pushInto: Boolean,
-      pushLeft: Boolean,
-      pushRight: Boolean,
-      joinFilters: util.List[RexNode],
-      leftFilters: util.List[RexNode],
-      rightFilters: util.List[RexNode]): Boolean = {
-    val rexBuilder = joinRel.getCluster.getRexBuilder
-    val joinFields = joinRel.getRowType.getFieldList
-    val nTotalFields = joinFields.size
-    val nSysFields = 0 // joinRel.getSystemFieldList().size();
-    val leftFields = joinRel.getInputs.get(0).getRowType.getFieldList
-    val nFieldsLeft = leftFields.size
-    val rightFields = joinRel.getInputs.get(1).getRowType.getFieldList
-    val nFieldsRight = rightFields.size
-
-    assert(nTotalFields == (if (joinType.projectsRight()) {
-      nSysFields + nFieldsLeft + nFieldsRight
-    } else {
-      // SEMI/ANTI
-      nSysFields + nFieldsLeft
-    }))
-
-    // set the reference bitmaps for the left and right children
-    val leftBitmap = ImmutableBitSet.range(nSysFields, nSysFields + nFieldsLeft)
-    val rightBitmap = ImmutableBitSet.range(nSysFields + nFieldsLeft, nTotalFields)
-
-    val filtersToRemove = new util.ArrayList[RexNode]
-
-    filters.foreach { filter =>
-      val inputFinder = InputFinder.analyze(filter)
-      val inputBits = inputFinder.inputBitSet.build
-      // REVIEW - are there any expressions that need special handling
-      // and therefore cannot be pushed?
-      // filters can be pushed to the left child if the left child
-      // does not generate NULLs and the only columns referenced in
-      // the filter originate from the left child
-      if (pushLeft && leftBitmap.contains(inputBits)) {
-        // ignore filters that always evaluate to true
-        if (!filter.isAlwaysTrue) {
-          // adjust the field references in the filter to reflect
-          // that fields in the left now shift over by the number
-          // of system fields
-          val shiftedFilter = shiftFilter(
-            nSysFields,
-            nSysFields + nFieldsLeft,
-            -nSysFields,
-            rexBuilder,
-            joinFields,
-            nTotalFields,
-            leftFields,
-            filter)
-          leftFilters.add(shiftedFilter)
-        }
-        filtersToRemove.add(filter)
-
-        // filters can be pushed to the right child if the right child
-        // does not generate NULLs and the only columns referenced in
-        // the filter originate from the right child
-      } else if (pushRight && rightBitmap.contains(inputBits)) {
-        if (!filter.isAlwaysTrue) {
-          // that fields in the right now shift over to the left;
-          // since we never push filters to a NULL generating
-          // child, the types of the source should match the dest
-          // so we don't need to explicitly pass the destination
-          // fields to RexInputConverter
-          val shiftedFilter = shiftFilter(
-            nSysFields + nFieldsLeft,
-            nTotalFields,
-            -(nSysFields + nFieldsLeft),
-            rexBuilder,
-            joinFields,
-            nTotalFields,
-            rightFields,
-            filter)
-          rightFilters.add(shiftedFilter)
-        }
-        filtersToRemove.add(filter)
-      } else {
-        // If the filter can't be pushed to either child and the join
-        // is an inner join, push them to the join if they originated
-        // from above the join
-        if ((joinType eq JoinRelType.INNER) && pushInto) {
-          if (!joinFilters.contains(filter)) {
-            joinFilters.add(filter)
-          }
-          filtersToRemove.add(filter)
-        }
-      }
-    }
-    // Remove filters after the loop, to prevent concurrent modification.
-    if (!filtersToRemove.isEmpty) {
-      filters.removeAll(filtersToRemove)
-    }
-    // Did anything change?
-    !filtersToRemove.isEmpty
-  }
-
-  private def shiftFilter(
-      start: Int,
-      end: Int,
-      offset: Int,
-      rexBuilder: RexBuilder,
-      joinFields: util.List[RelDataTypeField],
-      nTotalFields: Int,
-      rightFields: util.List[RelDataTypeField],
-      filter: RexNode): RexNode = {
-    val adjustments = new Array[Int](nTotalFields)
-    (start until end).foreach {
-      i => adjustments(i) = offset
-    }
-    filter.accept(
-      new RelOptUtil.RexInputConverter(
-        rexBuilder,
-        joinFields,
-        rightFields,
-        adjustments)
-    )
-  }
-
-  /**
     * Pushes down expressions in "equal" join condition.
     *
-    * NOTES: This method should be deleted when upgrading to a new calcite version
-    * which contains CALCITE-2969.
+    * NOTES: This method should be deleted when CALCITE-3171 is fixed.
     *
     * <p>For example, given
     * "emp JOIN dept ON emp.deptno + 1 = dept.deptno", adds a project above
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SetOperatorsTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SetOperatorsTest.xml
index a6746ff..c514ad8 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SetOperatorsTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SetOperatorsTest.xml
@@ -32,7 +32,7 @@ LogicalIntersect(all=[false])
     <Resource name="planAfter">
       <![CDATA[
 HashAggregate(isMerge=[false], groupBy=[c], select=[c])
-+- HashJoin(joinType=[LeftSemiJoin], where=[OR(=(c, f), AND(IS NULL(c), IS NULL(f)))], select=[c], build=[left])
++- HashJoin(joinType=[LeftSemiJoin], where=[IS NOT DISTINCT FROM(c, f)], select=[c], build=[left])
    :- Exchange(distribution=[hash[c]])
    :  +- Calc(select=[c])
    :     +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
@@ -127,7 +127,7 @@ LogicalMinus(all=[false])
     <Resource name="planAfter">
       <![CDATA[
 HashAggregate(isMerge=[false], groupBy=[c], select=[c])
-+- HashJoin(joinType=[LeftAntiJoin], where=[OR(=(c, f), AND(IS NULL(c), IS NULL(f)))], select=[c], build=[left])
++- HashJoin(joinType=[LeftAntiJoin], where=[IS NOT DISTINCT FROM(c, f)], select=[c], build=[left])
    :- Exchange(distribution=[hash[c]])
    :  +- Calc(select=[c])
    :     +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.xml
index b0e8f8b..141cbdf 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.xml
@@ -1090,11 +1090,11 @@ LogicalIntersect(all=[false])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-NestedLoopJoin(joinType=[LeftSemiJoin], where=[OR(=(random, random0), AND(IS NULL(random), IS NULL(random0)))], select=[random], build=[right])
+NestedLoopJoin(joinType=[LeftSemiJoin], where=[IS NOT DISTINCT FROM(random, random0)], select=[random], build=[right])
 :- SortAggregate(isMerge=[false], groupBy=[random], select=[random])
 :  +- Sort(orderBy=[random ASC])
 :     +- Exchange(distribution=[hash[random]])
-:        +- NestedLoopJoin(joinType=[LeftSemiJoin], where=[OR(=(random, random0), AND(IS NULL(random), IS NULL(random0)))], select=[random], build=[right])
+:        +- NestedLoopJoin(joinType=[LeftSemiJoin], where=[IS NOT DISTINCT FROM(random, random0)], select=[random], build=[right])
 :           :- Exchange(distribution=[any], exchange_mode=[BATCH])
 :           :  +- Calc(select=[random], reuse_id=[1])
 :           :     +- SortLimit(orderBy=[EXPR$1 ASC], offset=[0], fetch=[1], global=[true])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.xml
index e281fb4..0377621 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.xml
@@ -38,11 +38,11 @@ GROUP BY b
 LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)], EXPR$2=[SUM($2)], EXPR$3=[SUM($3)])
 +- LogicalProject(b=[$0], a=[$1], c=[$2], d=[$3])
    +- LogicalFilter(condition=[>($7, 10)])
-      +- LogicalFilter(condition=[=($1, $5)])
-         +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{4}])
-            :- LogicalProject(b=[$1], a=[$0], c=[$2], d=[$3], proctime=[PROCTIME()])
-            :  +- LogicalAggregate(group=[{0, 1}], c=[SUM($2)], d=[SUM($3)])
-            :     +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+      +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{1, 4}])
+         :- LogicalProject(b=[$1], a=[$0], c=[$2], d=[$3], proctime=[PROCTIME()])
+         :  +- LogicalAggregate(group=[{0, 1}], c=[SUM($2)], d=[SUM($3)])
+         :     +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+         +- LogicalFilter(condition=[=($cor0.a, $0)])
             +- LogicalSnapshot(period=[$cor0.proctime])
                +- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
 ]]>
@@ -69,10 +69,10 @@ HashAggregate(isMerge=[true], groupBy=[b], select=[b, Final_COUNT(count$0) AS EX
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], id=[$4], name=[$5], age=[$6])
-+- LogicalFilter(condition=[=($0, $4)])
-   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{3}])
-      :- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()])
-      :  +- LogicalTableScan(table=[[default_catalog, default_database, T0]])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+   :- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T0]])
+   +- LogicalFilter(condition=[=($cor0.a, $0)])
       +- LogicalSnapshot(period=[$cor0.proctime])
          +- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
 ]]>
@@ -98,10 +98,10 @@ WHERE T.c > 1000
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], id=[$4], name=[$5], age=[$6])
 +- LogicalFilter(condition=[>($2, 1000)])
-   +- LogicalFilter(condition=[AND(=($0, $4), =($6, 10))])
-      +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{3}])
-         :- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()])
-         :  +- LogicalTableScan(table=[[default_catalog, default_database, T0]])
+   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+      :- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()])
+      :  +- LogicalTableScan(table=[[default_catalog, default_database, T0]])
+      +- LogicalFilter(condition=[AND(=($cor0.a, $0), =($2, 10))])
          +- LogicalSnapshot(period=[$cor0.proctime])
             +- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
 ]]>
@@ -122,12 +122,12 @@ Calc(select=[a, b, c, proctime, id, name, CAST(10) AS age])
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], proctime=[$2], id=[$3], name=[$4], age=[$5])
-+- LogicalFilter(condition=[=($0, $3)])
-   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{2}])
-      :- LogicalProject(a=[$0], b=[$1], proctime=[$3])
-      :  +- LogicalFilter(condition=[>($2, 1000)])
-      :     +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()])
-      :        +- LogicalTableScan(table=[[default_catalog, default_database, T0]])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 2}])
+   :- LogicalProject(a=[$0], b=[$1], proctime=[$3])
+   :  +- LogicalFilter(condition=[>($2, 1000)])
+   :     +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()])
+   :        +- LogicalTableScan(table=[[default_catalog, default_database, T0]])
+   +- LogicalFilter(condition=[=($cor0.a, $0)])
       +- LogicalSnapshot(period=[$cor0.proctime])
          +- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
 ]]>
@@ -147,10 +147,10 @@ LookupJoin(table=[TestTemporalTable(id, name, age)], joinType=[InnerJoin], async
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], id=[$4], name=[$5], age=[$6])
-+- LogicalFilter(condition=[=($0, $4)])
-   +- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{3}])
-      :- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()])
-      :  +- LogicalTableScan(table=[[default_catalog, default_database, T0]])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 3}])
+   :- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T0]])
+   +- LogicalFilter(condition=[=($cor0.a, $0)])
       +- LogicalSnapshot(period=[$cor0.proctime])
          +- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
 ]]>
@@ -175,10 +175,10 @@ ON T.a = D.id
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], id=[$4])
-+- LogicalFilter(condition=[=($0, $4)])
-   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{3}])
-      :- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()])
-      :  +- LogicalTableScan(table=[[default_catalog, default_database, T0]])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+   :- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, T0]])
+   +- LogicalFilter(condition=[=($cor0.a, $0)])
       +- LogicalSnapshot(period=[$cor0.proctime])
          +- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
 ]]>
@@ -213,11 +213,11 @@ GROUP BY b
 LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)], EXPR$2=[SUM($2)], EXPR$3=[SUM($3)])
 +- LogicalProject(b=[$0], a=[$1], c=[$2], d=[$3])
    +- LogicalFilter(condition=[>($7, 10)])
-      +- LogicalFilter(condition=[=($1, $5)])
-         +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{4}])
-            :- LogicalProject(b=[$1], a=[$0], c=[$2], d=[$3], proctime=[PROCTIME()])
-            :  +- LogicalAggregate(group=[{0, 1}], c=[SUM($2)], d=[SUM($3)])
-            :     +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+      +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{1, 4}])
+         :- LogicalProject(b=[$1], a=[$0], c=[$2], d=[$3], proctime=[PROCTIME()])
+         :  +- LogicalAggregate(group=[{0, 1}], c=[SUM($2)], d=[SUM($3)])
+         :     +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+         +- LogicalFilter(condition=[=($cor0.a, $0)])
             +- LogicalSnapshot(period=[$cor0.proctime])
                +- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
 ]]>
@@ -274,20 +274,20 @@ LogicalProject(EXPR$0=[$2], EXPR$1=[$3], EXPR$2=[$4])
          +- LogicalJoin(condition=[true], joinType=[inner])
             :- LogicalProject(b=[$0], a=[$1], c=[$2], d=[$3], proctime=[$4], id=[$5], name=[$6], age=[$7])
             :  +- LogicalFilter(condition=[>($7, 10)])
-            :     +- LogicalFilter(condition=[=($1, $5)])
-            :        +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{4}])
-            :           :- LogicalProject(b=[$1], a=[$0], c=[$2], d=[$3], proctime=[PROCTIME()])
-            :           :  +- LogicalAggregate(group=[{0, 1}], c=[SUM($2)], d=[SUM($3)])
-            :           :     +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+            :     +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{1, 4}])
+            :        :- LogicalProject(b=[$1], a=[$0], c=[$2], d=[$3], proctime=[PROCTIME()])
+            :        :  +- LogicalAggregate(group=[{0, 1}], c=[SUM($2)], d=[SUM($3)])
+            :        :     +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+            :        +- LogicalFilter(condition=[=($cor0.a, $0)])
             :           +- LogicalSnapshot(period=[$cor0.proctime])
             :              +- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
             +- LogicalProject(a=[$5], b=[$0])
                +- LogicalFilter(condition=[>($7, 10)])
-                  +- LogicalFilter(condition=[=($1, $5)])
-                     +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], requiredColumns=[{4}])
-                        :- LogicalProject(b=[$1], a=[$0], c=[$2], d=[$3], proctime=[PROCTIME()])
-                        :  +- LogicalAggregate(group=[{0, 1}], c=[SUM($2)], d=[SUM($3)])
-                        :     +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+                  +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], requiredColumns=[{1, 4}])
+                     :- LogicalProject(b=[$1], a=[$0], c=[$2], d=[$3], proctime=[PROCTIME()])
+                     :  +- LogicalAggregate(group=[{0, 1}], c=[SUM($2)], d=[SUM($3)])
+                     :     +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+                     +- LogicalFilter(condition=[=($cor1.a, $0)])
                         +- LogicalSnapshot(period=[$cor1.proctime])
                            +- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
 ]]>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/ReplaceIntersectWithSemiJoinRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/ReplaceIntersectWithSemiJoinRuleTest.xml
index 1c3b540..e1e5afd 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/ReplaceIntersectWithSemiJoinRuleTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/ReplaceIntersectWithSemiJoinRuleTest.xml
@@ -32,7 +32,7 @@ LogicalIntersect(all=[false])
     <Resource name="planAfter">
       <![CDATA[
 LogicalAggregate(group=[{0}])
-+- LogicalJoin(condition=[OR(=($0, $1), AND(IS NULL($0), IS NULL($1)))], joinType=[semi])
++- LogicalJoin(condition=[IS NOT DISTINCT FROM($0, $1)], joinType=[semi])
    :- LogicalProject(c=[$2])
    :  +- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]])
    +- LogicalProject(f=[$2])
@@ -57,7 +57,7 @@ LogicalIntersect(all=[false])
     <Resource name="planAfter">
       <![CDATA[
 LogicalAggregate(group=[{0}])
-+- LogicalJoin(condition=[OR(=($0, $1), AND(IS NULL($0), IS NULL($1)))], joinType=[semi])
++- LogicalJoin(condition=[IS NOT DISTINCT FROM($0, $1)], joinType=[semi])
    :- LogicalProject(c=[$2])
    :  +- LogicalFilter(condition=[=(1, 0)])
    :     +- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]])
@@ -83,7 +83,7 @@ LogicalIntersect(all=[false])
     <Resource name="planAfter">
       <![CDATA[
 LogicalAggregate(group=[{0}])
-+- LogicalJoin(condition=[OR(=($0, $1), AND(IS NULL($0), IS NULL($1)))], joinType=[semi])
++- LogicalJoin(condition=[IS NOT DISTINCT FROM($0, $1)], joinType=[semi])
    :- LogicalProject(c=[$2])
    :  +- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]])
    +- LogicalProject(f=[$2])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/ReplaceMinusWithAntiJoinRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/ReplaceMinusWithAntiJoinRuleTest.xml
index f000d5c..4470529 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/ReplaceMinusWithAntiJoinRuleTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/ReplaceMinusWithAntiJoinRuleTest.xml
@@ -32,7 +32,7 @@ LogicalMinus(all=[false])
     <Resource name="planAfter">
       <![CDATA[
 LogicalAggregate(group=[{0}])
-+- LogicalJoin(condition=[OR(=($0, $1), AND(IS NULL($0), IS NULL($1)))], joinType=[anti])
++- LogicalJoin(condition=[IS NOT DISTINCT FROM($0, $1)], joinType=[anti])
    :- LogicalProject(c=[$2])
    :  +- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]])
    +- LogicalProject(f=[$2])
@@ -57,7 +57,7 @@ LogicalMinus(all=[false])
     <Resource name="planAfter">
       <![CDATA[
 LogicalAggregate(group=[{0}])
-+- LogicalJoin(condition=[OR(=($0, $1), AND(IS NULL($0), IS NULL($1)))], joinType=[anti])
++- LogicalJoin(condition=[IS NOT DISTINCT FROM($0, $1)], joinType=[anti])
    :- LogicalProject(c=[$2])
    :  +- LogicalFilter(condition=[=(1, 0)])
    :     +- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]])
@@ -83,7 +83,7 @@ LogicalMinus(all=[false])
     <Resource name="planAfter">
       <![CDATA[
 LogicalAggregate(group=[{0}])
-+- LogicalJoin(condition=[OR(=($0, $1), AND(IS NULL($0), IS NULL($1)))], joinType=[anti])
++- LogicalJoin(condition=[IS NOT DISTINCT FROM($0, $1)], joinType=[anti])
    :- LogicalProject(c=[$2])
    :  +- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]])
    +- LogicalProject(f=[$2])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/subquery/SubqueryCorrelateVariablesValidationTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/subquery/SubqueryCorrelateVariablesValidationTest.xml
new file mode 100644
index 0000000..9316828
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/subquery/SubqueryCorrelateVariablesValidationTest.xml
@@ -0,0 +1,102 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testWithProjectProjectCorrelate">
+    <Resource name="sql">
+      <![CDATA[
+SELECT (SELECT min(t1.t1d) FROM t3 WHERE t3.t3a = 'test') min_t1d
+FROM   t1
+WHERE  t1a = 'test'
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(min_t1d=[$SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[MIN($0)])
+  LogicalProject($f0=[$cor0.t1d])
+    LogicalFilter(condition=[=($0, _UTF-16LE'test')])
+      LogicalTableScan(table=[[default_catalog, default_database, t3, source: [TestTableSource(t3a, t3b, t3c, t3d, t3e, t3f, t3g, t3h, t3i)]]])
+})])
++- LogicalFilter(condition=[=($0, _UTF-16LE'test')])
+   +- LogicalTableScan(table=[[default_catalog, default_database, t1, source: [TestTableSource(t1a, t1b, t1c, t1d, t1e, t1f, t1g, t1h, t1i)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(min_t1d=[$10])
++- LogicalJoin(condition=[=($3, $9)], joinType=[left])
+   :- LogicalFilter(condition=[=($0, _UTF-16LE'test')])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, t1, source: [TestTableSource(t1a, t1b, t1c, t1d, t1e, t1f, t1g, t1h, t1i)]]])
+   +- LogicalAggregate(group=[{0}], EXPR$0=[MIN($1)])
+      +- LogicalProject(t1d=[$9], $f0=[$9])
+         +- LogicalJoin(condition=[true], joinType=[inner])
+            :- LogicalFilter(condition=[=($0, _UTF-16LE'test')])
+            :  +- LogicalTableScan(table=[[default_catalog, default_database, t3, source: [TestTableSource(t3a, t3b, t3c, t3d, t3e, t3f, t3g, t3h, t3i)]]])
+            +- LogicalAggregate(group=[{0}])
+               +- LogicalProject(t1d=[$3])
+                  +- LogicalFilter(condition=[=($0, _UTF-16LE'test')])
+                     +- LogicalTableScan(table=[[default_catalog, default_database, t1, source: [TestTableSource(t1a, t1b, t1c, t1d, t1e, t1f, t1g, t1h, t1i)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testWithProjectFilterCorrelate">
+    <Resource name="sql">
+      <![CDATA[
+SELECT (SELECT min(t3d) FROM t3 WHERE t3.t3a = t1.t1a) min_t3d,
+       (SELECT max(t2h) FROM t2 WHERE t2.t2a = t1.t1a) max_t2h
+FROM   t1
+    WHERE  t1a = 'test'
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(min_t3d=[$SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[MIN($0)])
+  LogicalProject(t3d=[$3])
+    LogicalFilter(condition=[=($0, $cor0.t1a)])
+      LogicalTableScan(table=[[default_catalog, default_database, t3, source: [TestTableSource(t3a, t3b, t3c, t3d, t3e, t3f, t3g, t3h, t3i)]]])
+})], max_t2h=[$SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[MAX($0)])
+  LogicalProject(t2h=[$7])
+    LogicalFilter(condition=[=($0, $cor1.t1a)])
+      LogicalTableScan(table=[[default_catalog, default_database, t2, source: [TestTableSource(t2a, t2b, t2c, t2d, t2e, t2f, t2g, t2h, t2i)]]])
+})])
++- LogicalFilter(condition=[=($0, _UTF-16LE'test')])
+   +- LogicalTableScan(table=[[default_catalog, default_database, t1, source: [TestTableSource(t1a, t1b, t1c, t1d, t1e, t1f, t1g, t1h, t1i)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(min_t3d=[$9], max_t2h=[$11])
++- LogicalJoin(condition=[=($0, $10)], joinType=[left])
+   :- LogicalProject(t1a=[$0], t1b=[$1], t1c=[$2], t1d=[$3], t1e=[$4], t1f=[$5], t1g=[$6], t1h=[$7], t1i=[$8], EXPR$0=[$10])
+   :  +- LogicalJoin(condition=[=($0, $9)], joinType=[left])
+   :     :- LogicalFilter(condition=[=($0, _UTF-16LE'test')])
+   :     :  +- LogicalTableScan(table=[[default_catalog, default_database, t1, source: [TestTableSource(t1a, t1b, t1c, t1d, t1e, t1f, t1g, t1h, t1i)]]])
+   :     +- LogicalAggregate(group=[{0}], EXPR$0=[MIN($1)])
+   :        +- LogicalProject(t3a=[$0], t3d=[$3])
+   :           +- LogicalFilter(condition=[IS NOT NULL($0)])
+   :              +- LogicalTableScan(table=[[default_catalog, default_database, t3, source: [TestTableSource(t3a, t3b, t3c, t3d, t3e, t3f, t3g, t3h, t3i)]]])
+   +- LogicalAggregate(group=[{0}], EXPR$0=[MAX($1)])
+      +- LogicalProject(t2a=[$0], t2h=[$7])
+         +- LogicalFilter(condition=[IS NOT NULL($0)])
+            +- LogicalTableScan(table=[[default_catalog, default_database, t2, source: [TestTableSource(t2a, t2b, t2c, t2d, t2e, t2f, t2g, t2h, t2i)]]])
+]]>
+    </Resource>
+  </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SetOperatorsTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SetOperatorsTest.xml
index 8094d28..66a6422 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SetOperatorsTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SetOperatorsTest.xml
@@ -33,7 +33,7 @@ LogicalIntersect(all=[false])
       <![CDATA[
 GroupAggregate(groupBy=[c], select=[c])
 +- Exchange(distribution=[hash[c]])
-   +- Join(joinType=[LeftSemiJoin], where=[OR(=(c, f), AND(IS NULL(c), IS NULL(f)))], select=[c], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   +- Join(joinType=[LeftSemiJoin], where=[IS NOT DISTINCT FROM(c, f)], select=[c], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
       :- Exchange(distribution=[hash[c]])
       :  +- Calc(select=[c])
       :     +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
@@ -128,7 +128,7 @@ LogicalMinus(all=[false])
       <![CDATA[
 GroupAggregate(groupBy=[c], select=[c])
 +- Exchange(distribution=[hash[c]])
-   +- Join(joinType=[LeftAntiJoin], where=[OR(=(c, f), AND(IS NULL(c), IS NULL(f)))], select=[c], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   +- Join(joinType=[LeftAntiJoin], where=[IS NOT DISTINCT FROM(c, f)], select=[c], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
       :- Exchange(distribution=[hash[c]])
       :  +- Calc(select=[c])
       :     +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SubplanReuseTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SubplanReuseTest.xml
index bfe8327..11df2ea 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SubplanReuseTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SubplanReuseTest.xml
@@ -668,11 +668,11 @@ LogicalIntersect(all=[false])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Join(joinType=[LeftSemiJoin], where=[OR(=(random, random0), AND(IS NULL(random), IS NULL(random0)))], select=[random], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[NoUniqueKey])
+Join(joinType=[LeftSemiJoin], where=[IS NOT DISTINCT FROM(random, random0)], select=[random], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[NoUniqueKey])
 :- Exchange(distribution=[hash[random]])
 :  +- GroupAggregate(groupBy=[random], select=[random])
 :     +- Exchange(distribution=[hash[random]])
-:        +- Join(joinType=[LeftSemiJoin], where=[OR(=(random, random0), AND(IS NULL(random), IS NULL(random0)))], select=[random], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+:        +- Join(joinType=[LeftSemiJoin], where=[IS NOT DISTINCT FROM(random, random0)], select=[random], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
 :           :- Exchange(distribution=[hash[random]], reuse_id=[1])
 :           :  +- Calc(select=[random])
 :           :     +- SortLimit(orderBy=[EXPR$1 ASC], offset=[0], fetch=[1])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.xml
index a734771..8e56ae1 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.xml
@@ -29,9 +29,9 @@ WHERE cast(D.name as bigint) > 1000
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
 +- LogicalFilter(condition=[>(CAST($6):BIGINT, 1000)])
-   +- LogicalFilter(condition=[AND(=($0, $5), =($7, 10))])
-      +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{3}])
-         :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+      :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+      +- LogicalFilter(condition=[AND(=($cor0.a, $0), =($2, 10))])
          +- LogicalSnapshot(period=[$cor0.proctime])
             +- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
 ]]>
@@ -66,11 +66,11 @@ GROUP BY b
 LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)], EXPR$2=[SUM($2)], EXPR$3=[SUM($3)])
 +- LogicalProject(b=[$0], a=[$1], c=[$2], d=[$3])
    +- LogicalFilter(condition=[>($7, 10)])
-      +- LogicalFilter(condition=[=($1, $5)])
-         +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{4}])
-            :- LogicalProject(b=[$1], a=[$0], c=[$2], d=[$3], proc=[PROCTIME()])
-            :  +- LogicalAggregate(group=[{0, 1}], c=[SUM($2)], d=[SUM($3)])
-            :     +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+      +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{1, 4}])
+         :- LogicalProject(b=[$1], a=[$0], c=[$2], d=[$3], proc=[PROCTIME()])
+         :  +- LogicalAggregate(group=[{0, 1}], c=[SUM($2)], d=[SUM($3)])
+         :     +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+         +- LogicalFilter(condition=[=($cor0.a, $0)])
             +- LogicalSnapshot(period=[$cor0.proc])
                +- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
 ]]>
@@ -95,9 +95,9 @@ GroupAggregate(groupBy=[b], select=[b, COUNT_RETRACT(a) AS EXPR$1, SUM_RETRACT(c
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
-+- LogicalFilter(condition=[=($0, $5)])
-   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{3}])
-      :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+   +- LogicalFilter(condition=[=($cor0.a, $0)])
       +- LogicalSnapshot(period=[$cor0.proctime])
          +- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
 ]]>
@@ -123,9 +123,9 @@ WHERE T.c > 1000
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
 +- LogicalFilter(condition=[>($2, 1000)])
-   +- LogicalFilter(condition=[AND(=($0, $5), =($7, 10))])
-      +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{3}])
-         :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+      :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+      +- LogicalFilter(condition=[AND(=($cor0.a, $0), =($2, 10))])
          +- LogicalSnapshot(period=[$cor0.proctime])
             +- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
 ]]>
@@ -152,9 +152,9 @@ WHERE T.c > 1000
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
 +- LogicalFilter(condition=[>($2, 1000)])
-   +- LogicalFilter(condition=[AND(=($0, $5), =($7, 10), =($6, _UTF-16LE'AAA'))])
-      +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{3}])
-         :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+      :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+      +- LogicalFilter(condition=[AND(=($cor0.a, $0), =($2, 10), =($1, _UTF-16LE'AAA'))])
          +- LogicalSnapshot(period=[$cor0.proctime])
             +- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
 ]]>
@@ -175,11 +175,11 @@ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, C
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], proctime=[$2], id=[$3], name=[$4], age=[$5])
-+- LogicalFilter(condition=[=($0, $3)])
-   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{2}])
-      :- LogicalProject(a=[$0], b=[$1], proctime=[$3])
-      :  +- LogicalFilter(condition=[>($2, 1000)])
-      :     +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 2}])
+   :- LogicalProject(a=[$0], b=[$1], proctime=[$3])
+   :  +- LogicalFilter(condition=[>($2, 1000)])
+   :     +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+   +- LogicalFilter(condition=[=($cor0.a, $0)])
       +- LogicalSnapshot(period=[$cor0.proctime])
          +- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
 ]]>
@@ -200,9 +200,9 @@ Calc(select=[a, b, PROCTIME_MATERIALIZE(proctime) AS proctime, id, name, age])
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
-+- LogicalFilter(condition=[=($0, $5)])
-   +- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{3}])
-      :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 3}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+   +- LogicalFilter(condition=[=($cor0.a, $0)])
       +- LogicalSnapshot(period=[$cor0.proctime])
          +- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
 ]]>
@@ -227,9 +227,9 @@ ON T.a = D.id
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5])
-+- LogicalFilter(condition=[=($0, $5)])
-   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{3}])
-      :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+   +- LogicalFilter(condition=[=($cor0.a, $0)])
       +- LogicalSnapshot(period=[$cor0.proctime])
          +- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
 ]]>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala
index b6c2b48..bb43ecc 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala
@@ -55,14 +55,14 @@ class ScalarFunctionsValidationTest extends ScalarTypesTestBase {
   @Test
   def testTimestampAddWithWrongTimestampInterval(): Unit = {
     thrown.expect(classOf[SqlParserException])
-    testSqlApi("TIMESTAMPADD(XXX, 1, timestamp '2016-02-24'))", "2016-06-16")
+    testSqlApi("TIMESTAMPADD(XXX, 1, timestamp '2016-02-24')", "2016-06-16")
   }
 
   @Test
   def testTimestampAddWithWrongTimestampFormat(): Unit = {
     thrown.expect(classOf[SqlParserException])
-    thrown.expectMessage("Illegal TIMESTAMP literal '2016-02-24'")
-    testSqlApi("TIMESTAMPADD(YEAR, 1, timestamp '2016-02-24'))", "2016-06-16")
+    thrown.expectMessage("Illegal TIMESTAMP literal '2016/02/24'")
+    testSqlApi("TIMESTAMPADD(YEAR, 1, timestamp '2016/02/24')", "2016-06-16")
   }
 
   @Test
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.scala
index 9ba36e9..7169f7a 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SubplanReuseTest.scala
@@ -58,8 +58,7 @@ class SubplanReuseTest extends TableTestBase {
     util.verifyPlanNotExpected(sqlQuery, "Reused")
   }
 
-  @Test(expected = classOf[AssertionError])
-  // TODO after CALCITE-3020 fixed, remove expected exception
+  @Test
   def testSubplanReuseWithDifferentRowType(): Unit = {
     util.tableEnv.getConfig.getConf.setBoolean(
       OptimizerConfigOptions.SQL_OPTIMIZER_REUSE_TABLE_SOURCE_ENABLED, false)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala
index 111cf9f..8d7c348 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala
@@ -74,7 +74,7 @@ class FlinkRelMdHandlerTestBase {
 
   val tableConfig = new TableConfig()
   val rootSchema: SchemaPlus = MetadataTestUtil.initRootSchema()
-  // TODO batch RelNode and stream RelNode should have different PlanningConfigurationBuilder
+  // TODO batch RelNode and stream RelNode should have different PlannerContext
   //  and RelOptCluster due to they have different trait definitions.
   val plannerContext: PlannerContext =
     new PlannerContext(
@@ -728,7 +728,6 @@ class FlinkRelMdHandlerTestBase {
       cluster,
       flinkLogicalTraits,
       studentFlinkLogicalScan,
-      logicalAgg.indicator,
       logicalAgg.getGroupSet,
       logicalAgg.getGroupSets,
       logicalAgg.getAggCallList
@@ -878,7 +877,6 @@ class FlinkRelMdHandlerTestBase {
       cluster,
       flinkLogicalTraits,
       studentFlinkLogicalScan,
-      logicalAggWithAuxGroup.indicator,
       logicalAggWithAuxGroup.getGroupSet,
       logicalAggWithAuxGroup.getGroupSets,
       logicalAggWithAuxGroup.getAggCallList
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/subquery/SubQueryAntiJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/subquery/SubQueryAntiJoinTest.scala
index dd5c042..71dc9a4e 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/subquery/SubQueryAntiJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/subquery/SubQueryAntiJoinTest.scala
@@ -213,8 +213,8 @@ class SubQueryAntiJoinTest extends SubQueryTestBase {
   def testNotInWithUncorrelatedOnWhere_Case7(): Unit = {
     util.addTableSource[(Int)]("t1", 'i)
 
-    thrown.expect(classOf[TableException])
-    thrown.expectMessage("unexpected correlate variable $cor0 in the plan")
+    // TODO some bugs in SubQueryRemoveRule
+    thrown.expect(classOf[RuntimeException])
 
     // TODO Calcite does not support project with correlated expressions.
     val sqlQuery = "SELECT b FROM l WHERE " +
@@ -725,8 +725,10 @@ class SubQueryAntiJoinTest extends SubQueryTestBase {
   def testNotInNotExists3(): Unit = {
     util.addTableSource[(Int, Long, String)]("t2", 'l, 'm, 'n)
 
-    thrown.expect(classOf[TableException])
-    thrown.expectMessage("unexpected correlate variable $cor0 in the plan")
+    // TODO some bugs in SubQueryRemoveRule
+    //  the result RelNode (LogicalJoin(condition=[=($1, $11)], joinType=[left]))
+    //  after SubQueryRemoveRule is unexpected
+    thrown.expect(classOf[RuntimeException])
 
     // TODO Calcite does not support project with correlated expressions.
     val sqlQuery = "SELECT c FROM l WHERE (" +
@@ -752,8 +754,8 @@ class SubQueryAntiJoinTest extends SubQueryTestBase {
   def testInNotInExistsNotExists2(): Unit = {
     util.addTableSource[(Int, Long, String)]("t2", 'l, 'm, 'n)
 
-    thrown.expect(classOf[TableException])
-    thrown.expectMessage("unexpected correlate variable $cor0 in the plan")
+    // TODO some bugs in SubQueryRemoveRule
+    thrown.expect(classOf[RuntimeException])
 
     // TODO Calcite does not support project with correlated expressions.
     val sqlQuery = "SELECT c FROM l WHERE (" +
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/subquery/SubQuerySemiJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/subquery/SubQuerySemiJoinTest.scala
index c1225f6..944eb02 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/subquery/SubQuerySemiJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/subquery/SubQuerySemiJoinTest.scala
@@ -208,9 +208,8 @@ class SubQuerySemiJoinTest extends SubQueryTestBase {
     util.addTableSource[(Int)]("t1", 'i)
     util.addTableSource[(Int)]("t2", 'j)
 
-    thrown.expect(classOf[TableException])
-    // correlate variable id is unstable, ignore here
-    thrown.expectMessage("unexpected correlate variable $cor")
+    // TODO some bugs in SubQueryRemoveRule
+    thrown.expect(classOf[RuntimeException])
 
     // TODO Calcite does not support project with correlated expressions.
     val sqlQuery = "SELECT b FROM l WHERE" +
@@ -1659,9 +1658,10 @@ class SubQuerySemiJoinTest extends SubQueryTestBase {
   def testInExists3(): Unit = {
     util.addTableSource[(Int, Long, String)]("t2", 'l, 'm, 'n)
 
-    thrown.expect(classOf[TableException])
-    // correlate variable id is unstable, ignore here
-    thrown.expectMessage("unexpected correlate variable $cor")
+    // TODO some bugs in SubQueryRemoveRule
+    //  the result RelNode (LogicalJoin(condition=[=($1, $8)], joinType=[left]))
+    //  after SubQueryRemoveRule is unexpected
+    thrown.expect(classOf[RuntimeException])
 
     // TODO Calcite does not support project with correlated expressions.
     val sqlQuery = "SELECT c FROM l WHERE (" +
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/subquery/SubqueryCorrelateVariablesValidationTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/subquery/SubqueryCorrelateVariablesValidationTest.scala
index 70e09aa..f3b07cb 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/subquery/SubqueryCorrelateVariablesValidationTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/subquery/SubqueryCorrelateVariablesValidationTest.scala
@@ -35,7 +35,7 @@ class SubqueryCorrelateVariablesValidationTest extends SubQueryTestBase {
   util.addTableSource[(String, Short, Int, Long, Float, Double, BigDecimal, Timestamp, Date)](
     "t3", 't3a, 't3b, 't3c, 't3d, 't3e, 't3f, 't3g, 't3h, 't3i)
 
-  @Test(expected = classOf[RuntimeException])
+  @Test
   def testWithProjectProjectCorrelate(): Unit = {
     val sqlQuery =
       """
@@ -46,7 +46,7 @@ class SubqueryCorrelateVariablesValidationTest extends SubQueryTestBase {
     util.verifyPlan(sqlQuery)
   }
 
-  @Test(expected = classOf[RuntimeException])
+  @Test
   def testWithProjectFilterCorrelate(): Unit = {
     val sqlQuery =
       """
@@ -111,7 +111,8 @@ class SubqueryCorrelateVariablesValidationTest extends SubQueryTestBase {
     util.verifyPlan(sqlQuery)
   }
 
-  @Test(expected = classOf[RuntimeException])
+  @Test(expected = classOf[AssertionError])
+  // TODO some bugs in RelDecorrelator.AdjustProjectForCountAggregateRule
   def testWithProjectCaseWhenCorrelate(): Unit = {
     val sqlQuery =
       """
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/SubplanReuseTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/SubplanReuseTest.scala
index 2fa8d39..8a455b7 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/SubplanReuseTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/SubplanReuseTest.scala
@@ -57,8 +57,7 @@ class SubplanReuseTest extends TableTestBase {
     util.verifyPlanNotExpected(sqlQuery, "Reused")
   }
 
-  @Test(expected = classOf[AssertionError])
-  // TODO after CALCITE-3020 fixed, remove expected exception
+  @Test
   def testSubplanReuseWithDifferentRowType(): Unit = {
     util.tableEnv.getConfig.getConf.setBoolean(
       OptimizerConfigOptions.SQL_OPTIMIZER_REUSE_TABLE_SOURCE_ENABLED, false)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
index c3a9e2d..1294ff4 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
@@ -77,8 +77,8 @@ class LookupJoinTest extends TableTestBase with Serializable {
     expectExceptionThrown(
       "SELECT * FROM MyTable AS T RIGHT JOIN temporalTest " +
         "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id",
-      "Unsupported join type for semi-join RIGHT",
-      classOf[IllegalArgumentException]
+      "Correlate has invalid join type RIGHT",
+      classOf[AssertionError]
     )
 
     // only support join on raw key of right table
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/PruneAggregateCallITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/PruneAggregateCallITCase.scala
index cd55c6a..00a8b76 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/PruneAggregateCallITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/PruneAggregateCallITCase.scala
@@ -46,7 +46,7 @@ class PruneAggregateCallITCase extends BatchTestBase {
     checkResult(
       """
         |SELECT c, a FROM
-        | (SELECT a, c, COUNT(b) as c, SUM(b) as s FROM MyTable GROUP BY a, c) t
+        | (SELECT a, c, COUNT(b) as cnt, SUM(b) as s FROM MyTable GROUP BY a, c) t
         |WHERE s > 1
       """.stripMargin,
       Seq(row("Hello world", 3), row("Hello", 2))
diff --git a/flink-table/flink-table-runtime-blink/pom.xml b/flink-table/flink-table-runtime-blink/pom.xml
index aeea8c1..f407bac 100644
--- a/flink-table/flink-table-runtime-blink/pom.xml
+++ b/flink-table/flink-table-runtime-blink/pom.xml
@@ -72,7 +72,7 @@ under the License.
 			<groupId>org.apache.calcite.avatica</groupId>
 			<artifactId>avatica-core</artifactId>
 			<!-- When updating the Calcite version, make sure to update the version and dependency exclusions -->
-			<version>1.13.0</version>
+			<version>1.15.0</version>
 			<exclusions>
 				<!--
 
@@ -81,9 +81,9 @@ under the License.
 				We exclude all the dependencies of Avatica because currently we only use
 				TimeUnit, TimeUnitRange and SqlDateTimeUtils which only dependent JDK.
 
-				"mvn dependency:tree" as of Avatica 1.13:
+				"mvn dependency:tree" as of Avatica 1.15:
 
-				[INFO] +- org.apache.calcite.avatica:avatica-core:jar:1.13.0:compile
+				[INFO] +- org.apache.calcite.avatica:avatica-core:jar:1.15.0:compile
 
 				-->
 				<exclusion>