You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/05/13 01:24:47 UTC

[flink] branch master updated: [FLINK-12487] [table-planner-blink] Introduce planner rules to rewrite expression and merge calc

This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b47b591  [FLINK-12487] [table-planner-blink] Introduce planner rules to rewrite expression and merge calc
b47b591 is described below

commit b47b5910c16a423504171a2a29230ff1d8966214
Author: godfrey he <go...@163.com>
AuthorDate: Mon May 13 09:24:30 2019 +0800

    [FLINK-12487] [table-planner-blink] Introduce planner rules to rewrite expression and merge calc
    
    This closes #8411
---
 .../flink/table/codegen/CodeGeneratorContext.scala |   2 +-
 .../table/plan/rules/FlinkBatchRuleSets.scala      |  23 +-
 .../table/plan/rules/FlinkStreamRuleSets.scala     |  24 +-
 .../rules/logical/ConvertToNotInOrInRule.scala     | 187 +++++++++
 .../plan/rules/logical/FlinkCalcMergeRule.scala    | 108 +++++
 .../plan/rules/logical/RewriteCoalesceRule.scala   | 237 +++++++++++
 .../apache/flink/table/plan/batch/sql/CalcTest.xml |   4 +-
 .../table/plan/batch/sql/agg/GroupingSetsTest.xml  |   6 +-
 .../rules/logical/ConvertToNotInOrInRuleTest.xml   | 437 +++++++++++++++++++++
 ...nkAggregateExpandDistinctAggregatesRuleTest.xml |   8 +-
 .../plan/rules/logical/FlinkCalcMergeRuleTest.xml  |  80 ++++
 .../plan/rules/logical/RewriteCoalesceRuleTest.xml | 164 ++++++++
 .../plan/rules/logical/SplitAggregateRuleTest.xml  |  36 +-
 .../flink/table/plan/stream/sql/CalcTest.xml       |   4 +-
 .../plan/stream/sql/agg/DistinctAggregateTest.xml  | 124 +++---
 .../table/plan/stream/sql/agg/GroupingSetsTest.xml |   4 +-
 .../stream/sql/agg/IncrementalAggregateTest.xml    |  63 ++-
 .../rules/logical/ConvertToNotInOrInRuleTest.scala | 170 ++++++++
 .../rules/logical/FlinkCalcMergeRuleTest.scala     |  87 ++++
 .../rules/logical/RewriteCoalesceRuleTest.scala    | 144 +++++++
 20 files changed, 1760 insertions(+), 152 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGeneratorContext.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGeneratorContext.scala
index 83d29df..59cb2b3 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGeneratorContext.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGeneratorContext.scala
@@ -398,7 +398,7 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
     }
 
     addReusableMember(
-      s"final $setTypeTerm $fieldTerm = new $setTypeTerm(${elements.size})")
+      s"final $setTypeTerm $fieldTerm = new $setTypeTerm(${elements.size});")
 
     elements.foreach { element =>
       val content =
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
index e87b77e..22f13cc 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
@@ -66,10 +66,22 @@ object FlinkBatchRuleSets {
   )
 
   /**
+    * RuleSet to rewrite coalesce to case when
+    */
+  private val REWRITE_COALESCE_RULES: RuleSet = RuleSets.ofList(
+    // rewrite coalesce to case when
+    RewriteCoalesceRule.FILTER_INSTANCE,
+    RewriteCoalesceRule.PROJECT_INSTANCE,
+    RewriteCoalesceRule.JOIN_INSTANCE,
+    RewriteCoalesceRule.CALC_INSTANCE
+  )
+
+  /**
     * RuleSet to normalize plans for batch
     */
   val DEFAULT_REWRITE_RULES: RuleSet = RuleSets.ofList((
-    REDUCE_EXPRESSION_RULES.asScala ++
+    REWRITE_COALESCE_RULES.asScala ++
+      REDUCE_EXPRESSION_RULES.asScala ++
       List(
         // Transform window to LogicalWindowAggregate
         BatchLogicalWindowAggregateRule.INSTANCE,
@@ -80,7 +92,8 @@ object FlinkBatchRuleSets {
         //ensure intersect set operator have the same row type
         new CoerceInputsRule(classOf[LogicalIntersect], false),
         //ensure except set operator have the same row type
-        new CoerceInputsRule(classOf[LogicalMinus], false)
+        new CoerceInputsRule(classOf[LogicalMinus], false),
+        ConvertToNotInOrInRule.INSTANCE
       )).asJava)
 
   /**
@@ -139,7 +152,9 @@ object FlinkBatchRuleSets {
     // reorder sort and projection
     ProjectSortTransposeRule.INSTANCE,
     //removes constant keys from an Agg
-    AggregateProjectPullUpConstantsRule.INSTANCE
+    AggregateProjectPullUpConstantsRule.INSTANCE,
+    // push project through a Union
+    ProjectSetOpTransposeRule.INSTANCE
   )
 
   val WINDOW_RULES: RuleSet = RuleSets.ofList(
@@ -197,7 +212,7 @@ object FlinkBatchRuleSets {
     ProjectCalcMergeRule.INSTANCE,
     FilterToCalcRule.INSTANCE,
     ProjectToCalcRule.INSTANCE,
-    CalcMergeRule.INSTANCE
+    FlinkCalcMergeRule.INSTANCE
   )
 
   /**
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
index 6ac853d..d6cdba8 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
@@ -66,10 +66,22 @@ object FlinkStreamRuleSets {
   )
 
   /**
+    * RuleSet to rewrite coalesce to case when
+    */
+  private val REWRITE_COALESCE_RULES: RuleSet = RuleSets.ofList(
+    // rewrite coalesce to case when
+    RewriteCoalesceRule.FILTER_INSTANCE,
+    RewriteCoalesceRule.PROJECT_INSTANCE,
+    RewriteCoalesceRule.JOIN_INSTANCE,
+    RewriteCoalesceRule.CALC_INSTANCE
+  )
+
+  /**
     * RuleSet to normalize plans for stream
     */
   val DEFAULT_REWRITE_RULES: RuleSet = RuleSets.ofList((
-    REDUCE_EXPRESSION_RULES.asScala ++
+    REWRITE_COALESCE_RULES.asScala ++
+      REDUCE_EXPRESSION_RULES.asScala ++
       List(
         StreamLogicalWindowAggregateRule.INSTANCE,
         // slices a project into sections which contain window agg functions
@@ -82,7 +94,8 @@ object FlinkStreamRuleSets {
         //ensure intersect set operator have the same row type
         new CoerceInputsRule(classOf[LogicalIntersect], false),
         //ensure except set operator have the same row type
-        new CoerceInputsRule(classOf[LogicalMinus], false)
+        new CoerceInputsRule(classOf[LogicalMinus], false),
+        ConvertToNotInOrInRule.INSTANCE
       )
     ).asJava)
 
@@ -187,7 +200,7 @@ object FlinkStreamRuleSets {
     ProjectCalcMergeRule.INSTANCE,
     FilterToCalcRule.INSTANCE,
     ProjectToCalcRule.INSTANCE,
-    CalcMergeRule.INSTANCE
+    FlinkCalcMergeRule.INSTANCE
   )
 
   /**
@@ -230,8 +243,9 @@ object FlinkStreamRuleSets {
     // transform over window to topn node
     FlinkLogicalRankRule.INSTANCE,
     // split distinct aggregate to reduce data skew
-    SplitAggregateRule.INSTANCE
-    // TODO add flink calc merge rule
+    SplitAggregateRule.INSTANCE,
+    // merge calc after calc transpose
+    FlinkCalcMergeRule.INSTANCE
   )
 
   /**
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/ConvertToNotInOrInRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/ConvertToNotInOrInRule.scala
new file mode 100644
index 0000000..4bdc3fc
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/ConvertToNotInOrInRule.scala
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.table.`type`.InternalTypes
+import org.apache.flink.table.calcite.FlinkTypeFactory
+
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptUtil}
+import org.apache.calcite.rel.core.Filter
+import org.apache.calcite.rex.{RexCall, RexLiteral, RexNode}
+import org.apache.calcite.sql.SqlBinaryOperator
+import org.apache.calcite.sql.fun.SqlStdOperatorTable.{AND, EQUALS, IN, NOT_EQUALS, NOT_IN, OR}
+import org.apache.calcite.tools.RelBuilder
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+
+/**
+  * Rule for converting a cascade of predicates to [[IN]] or [[NOT_IN]].
+  *
+  * For example,
+  * 1. convert predicate: (x = 1 OR x = 2 OR x = 3 OR x = 4) AND y = 5
+  * to predicate: x IN (1, 2, 3, 4) AND y = 5.
+  * 2. convert predicate: (x <> 1 AND x <> 2 AND x <> 3 AND x <> 4) AND y = 5
+  * to predicate: x NOT IN (1, 2, 3, 4) AND y = 5.
+  */
+class ConvertToNotInOrInRule
+  extends RelOptRule(
+    operand(classOf[Filter], any),
+    "ConvertToNotInOrInRule") {
+
+  // these threshold values are set by OptimizableHashSet benchmark test on different type.
+  // threshold for non-float and non-double type
+  private val THRESHOLD: Int = 4
+  // threshold for float and double type
+  private val FRACTIONAL_THRESHOLD: Int = 20
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+    val filter: Filter = call.rel(0)
+    val condition = filter.getCondition
+
+    // convert equal expression connected by OR to IN
+    val inExpr = convertToNotInOrIn(call.builder(), condition, IN)
+    // convert not-equal expression connected by AND to NOT_IN
+    val notInExpr = convertToNotInOrIn(call.builder(), inExpr.getOrElse(condition), NOT_IN)
+
+    notInExpr match {
+      case Some(expr) =>
+        val newFilter = filter.copy(filter.getTraitSet, filter.getInput, expr)
+        call.transformTo(newFilter)
+      case _ =>
+        // check IN conversion if NOT_IN conversion is fail
+        inExpr match {
+          case Some(expr) =>
+            val newFilter = filter.copy(filter.getTraitSet, filter.getInput, expr)
+            call.transformTo(newFilter)
+          case _ => // do nothing
+        }
+    }
+  }
+
+  /**
+    * Returns a condition decomposed by [[AND]] or [[OR]].
+    */
+  private def decomposedBy(rex: RexNode, operator: SqlBinaryOperator): Seq[RexNode] = {
+    operator match {
+      case AND => RelOptUtil.conjunctions(rex)
+      case OR => RelOptUtil.disjunctions(rex)
+    }
+  }
+
+  /**
+    * Convert a cascade predicates to [[IN]] or [[NOT_IN]].
+    *
+    * @param builder The [[RelBuilder]] to build the [[RexNode]].
+    * @param rex     The predicates to be converted.
+    * @return The converted predicates.
+    */
+  private def convertToNotInOrIn(
+      builder: RelBuilder,
+      rex: RexNode,
+      toOperator: SqlBinaryOperator): Option[RexNode] = {
+
+    // For example, when convert to [[IN]], fromOperator is [[EQUALS]].
+    // We convert a cascade of [[EQUALS]] to [[IN]].
+    // A connect operator is used to connect the fromOperator.
+    // A composed operator may contains sub [[IN]] or [[NOT_IN]].
+    val (fromOperator, connectOperator, composedOperator) = toOperator match {
+      case IN => (EQUALS, OR, AND)
+      case NOT_IN => (NOT_EQUALS, AND, OR)
+    }
+
+    val decomposed = decomposedBy(rex, connectOperator)
+    val combineMap = new mutable.HashMap[String, mutable.ListBuffer[RexCall]]
+    val rexBuffer = new mutable.ArrayBuffer[RexNode]
+    var beenConverted = false
+
+    // traverse decomposed predicates
+    decomposed.foreach {
+      case call: RexCall =>
+        call.getOperator match {
+          // put same predicates into combine map
+          case `fromOperator` =>
+            (call.operands(0), call.operands(1)) match {
+              case (ref, _: RexLiteral) =>
+                combineMap.getOrElseUpdate(ref.toString, mutable.ListBuffer[RexCall]()) += call
+              case (l: RexLiteral, ref) =>
+                combineMap.getOrElseUpdate(ref.toString, mutable.ListBuffer[RexCall]()) +=
+                  call.clone(call.getType, List(ref, l))
+              case _ => rexBuffer += call
+            }
+
+          // process sub predicates
+          case `composedOperator` =>
+            val newRex = decomposedBy(call, composedOperator).map { r =>
+              convertToNotInOrIn(builder, r, toOperator) match {
+                case Some(ex) =>
+                  beenConverted = true
+                  ex
+                case None => r
+              }
+            }
+            composedOperator match {
+              case AND => rexBuffer += builder.and(newRex)
+              case OR => rexBuffer += builder.or(newRex)
+            }
+
+          case _ => rexBuffer += call
+        }
+
+      case rex => rexBuffer += rex
+    }
+
+    combineMap.values.foreach { list =>
+      if (needConvert(list.toList)) {
+        val inputRef = list.head.getOperands.head
+        val values = list.map(_.getOperands.last)
+        rexBuffer += builder.getRexBuilder.makeCall(toOperator, List(inputRef) ++ values)
+        beenConverted = true
+      } else {
+        connectOperator match {
+          case AND => rexBuffer += builder.and(list)
+          case OR => rexBuffer += builder.or(list)
+        }
+      }
+    }
+
+    if (beenConverted) {
+      // return result if has been converted
+      connectOperator match {
+        case AND => Some(builder.and(rexBuffer))
+        case OR => Some(builder.or(rexBuffer))
+      }
+    } else {
+      None
+    }
+  }
+
+  private def needConvert(rexNodes: List[RexCall]): Boolean = {
+    val inputRef = rexNodes.head.getOperands.head
+    FlinkTypeFactory.toInternalType(inputRef.getType) match {
+      case InternalTypes.FLOAT | InternalTypes.DOUBLE => rexNodes.size >= FRACTIONAL_THRESHOLD
+      case _ => rexNodes.size >= THRESHOLD
+    }
+  }
+}
+
+object ConvertToNotInOrInRule {
+  val INSTANCE = new ConvertToNotInOrInRule
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/FlinkCalcMergeRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/FlinkCalcMergeRule.scala
new file mode 100644
index 0000000..8deddfb
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/FlinkCalcMergeRule.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.table.plan.util.FlinkRexUtil
+
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.core.{Calc, RelFactories}
+import org.apache.calcite.rex.{RexOver, RexProgramBuilder, RexUtil}
+import org.apache.calcite.tools.RelBuilderFactory
+
+import scala.collection.JavaConversions._
+
+/**
+  * This rules is copied from Calcite's [[org.apache.calcite.rel.rules.CalcMergeRule]].
+  *
+  * Modification:
+  * - Condition in the merged program will be simplified if it exists.
+  * - Don't merge calcs which contain non-deterministic expr
+  */
+
+/**
+  * Planner rule that merges a [[Calc]] onto a [[Calc]].
+  *
+  * <p>The resulting [[Calc]] has the same project list as the upper [[Calc]],
+  * but expressed in terms of the lower [[Calc]]'s inputs.
+  */
+class FlinkCalcMergeRule(relBuilderFactory: RelBuilderFactory) extends RelOptRule(
+  operand(classOf[Calc],
+    operand(classOf[Calc], any)),
+  relBuilderFactory,
+  "FlinkCalcMergeRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val topCalc: Calc = call.rel(0)
+    val bottomCalc: Calc = call.rel(1)
+
+    // Don't merge a calc which contains windowed aggregates onto a
+    // calc. That would effectively be pushing a windowed aggregate down
+    // through a filter.
+    val topProgram = topCalc.getProgram
+    if (RexOver.containsOver(topProgram)) {
+      return false
+    }
+
+    // Don't merge Calcs which contain non-deterministic expr
+    topProgram.getExprList.forall(RexUtil.isDeterministic) &&
+      bottomCalc.getProgram.getExprList.forall(RexUtil.isDeterministic)
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+    val topCalc: Calc = call.rel(0)
+    val bottomCalc: Calc = call.rel(1)
+
+    val topProgram = topCalc.getProgram
+    val rexBuilder = topCalc.getCluster.getRexBuilder
+    // Merge the programs together.
+    val mergedProgram = RexProgramBuilder.mergePrograms(
+      topCalc.getProgram, bottomCalc.getProgram, rexBuilder)
+    require(mergedProgram.getOutputRowType eq topProgram.getOutputRowType)
+
+    val newMergedProgram = if (mergedProgram.getCondition != null) {
+      val condition = mergedProgram.expandLocalRef(mergedProgram.getCondition)
+      val simplifiedCondition = FlinkRexUtil.simplify(rexBuilder, condition)
+      if (simplifiedCondition.toString == condition.toString) {
+        mergedProgram
+      } else {
+        val programBuilder = RexProgramBuilder.forProgram(mergedProgram, rexBuilder, true)
+        programBuilder.clearCondition()
+        programBuilder.addCondition(simplifiedCondition)
+        programBuilder.getProgram(true)
+      }
+    } else {
+      mergedProgram
+    }
+
+    val newCalc = topCalc.copy(topCalc.getTraitSet, bottomCalc.getInput, newMergedProgram)
+    if (newCalc.getDigest == bottomCalc.getDigest) {
+      // newCalc is equivalent to bottomCalc,
+      // which means that topCalc
+      // must be trivial. Take it out of the game.
+      call.getPlanner.setImportance(topCalc, 0.0)
+    }
+    call.transformTo(newCalc)
+  }
+
+}
+
+object FlinkCalcMergeRule {
+  val INSTANCE = new FlinkCalcMergeRule(RelFactories.LOGICAL_BUILDER)
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RewriteCoalesceRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RewriteCoalesceRule.scala
new file mode 100644
index 0000000..bedfa53
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RewriteCoalesceRule.scala
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.{Calc, Filter, Join, Project}
+import org.apache.calcite.rel.logical.{LogicalCalc, LogicalFilter, LogicalJoin, LogicalProject}
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.SqlKind
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
+import org.apache.calcite.util.Util
+
+import scala.collection.JavaConversions._
+
+/**
+  * Collection of planner rules that transform `Coalesce` to `Case When` RexNode trees.
+  *
+  * <p>Currently this is only used for natural join, for explicit Coalesce
+  * Calcite already replace it with Case When.
+  *
+  * <p>There are four transformation contexts:
+  * <ul>
+  * <li>Project project list
+  * <li>Join condition
+  * <li>Filter condition
+  * <li>Calc expression list
+  * </ul>
+  */
+abstract class RewriteCoalesceRule[T <: RelNode](
+    clazz: Class[T],
+    description: String)
+  extends RelOptRule(
+    operand(clazz, any),
+    description) {
+
+  private class CoalesceToCaseShuttle(rexBuilder: RexBuilder) extends RexShuttle {
+    override def visitCall(call: RexCall): RexNode = {
+      call.getKind match {
+        case SqlKind.COALESCE =>
+          val operands = call.getOperands
+          if (operands.size == 1) {
+            operands.head
+          } else {
+            val operandsExceptLast = Util.skipLast(call.getOperands)
+            val args: ImmutableList.Builder[RexNode] = ImmutableList.builder()
+            operandsExceptLast.foreach { operand =>
+              args.add(rexBuilder.makeCall(SqlStdOperatorTable.IS_NOT_NULL, operand), operand)
+            }
+            args.add(call.getOperands.last)
+            rexBuilder.makeCall(SqlStdOperatorTable.CASE, args.build)
+          }
+        case _ => super.visitCall(call)
+      }
+    }
+  }
+
+  protected def replace(input: RexNode,
+      rexBuilder: RexBuilder): RexNode = {
+    val shuttle = new CoalesceToCaseShuttle(rexBuilder)
+    input.accept(shuttle)
+  }
+
+  protected def existsCoalesce(rexNode: RexNode): Boolean = {
+    class CoalesceFinder extends RexVisitorImpl[Unit](true) {
+      var found = false
+
+      override def visitCall(call: RexCall): Unit = {
+        call.getKind match {
+          case SqlKind.COALESCE => found = true
+          case _ => super.visitCall(call)
+        }
+      }
+
+      def isFound: Boolean = found
+    }
+    val finder = new CoalesceFinder
+    rexNode.accept(finder)
+    finder.isFound
+  }
+
+}
+
+/**
+  * Planner rule that rewrites `Coalesce` in filter condition to `Case When`.
+  */
+class FilterRewriteCoalesceRule extends
+  RewriteCoalesceRule(
+    classOf[LogicalFilter],
+    "FilterRewriteCoalesceRule") {
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val filter: Filter = call.rel(0)
+    existsCoalesce(filter.getCondition)
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+    val filter: Filter = call.rel(0)
+    val relBuilder: RelBuilder = call.builder()
+    val rexBuilder: RexBuilder = relBuilder.getRexBuilder
+    // transform the filter recursively, may change all the Coalesce in the filter
+    // to Case when, this is ok for us now, cause the filter will never be matched again
+    // by this rule after the transformation.
+    val newCondition = replace(filter.getCondition, rexBuilder)
+    val newFilter = relBuilder
+      .push(filter.getInput)
+      .filter(newCondition)
+      .build()
+    call.transformTo(newFilter)
+  }
+}
+
+/**
+  * Planner rule that rewrites `Coalesce` in project list to `Case When`.
+  */
+class ProjectRewriteCoalesceRule extends
+  RewriteCoalesceRule(
+    classOf[LogicalProject],
+    "ProjectRewriteCoalesceRule") {
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val prj: Project = call.rel(0)
+    prj.getProjects.exists(p => existsCoalesce(p))
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+    val prj: Project = call.rel(0)
+    val relBuilder: RelBuilder = call.builder()
+    val rexBuilder: RexBuilder = relBuilder.getRexBuilder
+    // transform the project recursively, may change all the Coalesce in the project
+    // to Case when, this is ok for us now, cause the project will never be matched again
+    // by this rule after the transformation.
+    val newProjects = prj.getProjects.map(p => replace(p, rexBuilder))
+    val newProject = relBuilder
+      .push(prj.getInput)
+      .project(newProjects)
+      .build()
+    call.transformTo(newProject)
+  }
+}
+
+/**
+  * Planner rule that rewrites `Coalesce` in join condition to `Case When`.
+  */
+class JoinRewriteCoalesceRule extends
+  RewriteCoalesceRule(
+    classOf[LogicalJoin],
+    "JoinRewriteCoalesceRule") {
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val prj: Join = call.rel(0)
+    existsCoalesce(prj.getCondition)
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+    val join: Join = call.rel(0)
+    val relBuilder: RelBuilder = call.builder()
+    val rexBuilder: RexBuilder = relBuilder.getRexBuilder
+    val newCondition = replace(join.getCondition, rexBuilder)
+    // transform the join recursively, may change all the Coalesce in the join
+    // to Case when, this is ok for us now, cause the join will never be matched again
+    // by this rule after the transformation.
+    val newJoin = join.copy(
+      join.getTraitSet,
+      newCondition,
+      join.getLeft,
+      join.getRight,
+      join.getJoinType,
+      join.isSemiJoinDone)
+    call.transformTo(newJoin)
+  }
+}
+
+/**
+  * Planner rule that rewrites `Coalesce` in calc expression list to `Case When`.
+  */
+class CalcRewriteCoalesceRule extends
+  RewriteCoalesceRule(
+    classOf[LogicalCalc],
+    "CalcRewriteCoalesceRule") {
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val calc: Calc = call.rel(0)
+    val program = calc.getProgram
+    val exprList = program.getExprList
+    exprList.exists(p => existsCoalesce(p))
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+    val calc: Calc = call.rel(0)
+    val relBuilder: RelBuilder = call.builder()
+    val rexBuilder: RexBuilder = relBuilder.getRexBuilder
+    // transform the Calc recursively, may change all the Coalesce in the Calc
+    // to Case when, this is ok for us now, cause the Calc will never be matched again
+    // by this rule after the transformation.
+    val program = calc.getProgram
+    val exprList = program.getExprList.map(expr => replace(expr, rexBuilder))
+    val builder: RexProgramBuilder = new RexProgramBuilder(
+      calc.getInput.getRowType, calc.getCluster.getRexBuilder)
+    val list = exprList.map(expr => builder.registerInput(expr))
+    if (program.getCondition != null) {
+      val conditionIndex = program.getCondition.getIndex
+      builder.addCondition(list.get(conditionIndex))
+    }
+    program.getProjectList.zipWithIndex.foreach {
+      case (projectExpr, idx) =>
+        val index = projectExpr.getIndex
+        builder.addProject(list.get(index).getIndex,
+          program.getOutputRowType.getFieldNames.get(idx))
+    }
+
+    val newCalc = calc.copy(calc.getTraitSet, calc.getInput, builder.getProgram)
+    call.transformTo(newCalc)
+  }
+}
+
+object RewriteCoalesceRule {
+  val FILTER_INSTANCE = new FilterRewriteCoalesceRule
+  val PROJECT_INSTANCE = new ProjectRewriteCoalesceRule
+  val JOIN_INSTANCE = new JoinRewriteCoalesceRule
+  val CALC_INSTANCE = new CalcRewriteCoalesceRule
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/CalcTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/CalcTest.xml
index 48faff1..d22ee5e 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/CalcTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/CalcTest.xml
@@ -103,7 +103,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, b, CAST(_UTF-16LE'xx':VARCHAR(65536) CHARACTER SET "UTF-16LE") AS c], where=[AND(OR(=(b, 1), =(b, 3), =(b, 4), =(b, 5), =(b, 6)), =(c, _UTF-16LE'xx':VARCHAR(65536) CHARACTER SET "UTF-16LE"))])
+Calc(select=[a, b, CAST(_UTF-16LE'xx':VARCHAR(65536) CHARACTER SET "UTF-16LE") AS c], where=[AND(IN(b, 1, 3, 4, 5, 6), =(c, _UTF-16LE'xx':VARCHAR(65536) CHARACTER SET "UTF-16LE"))])
 +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -210,7 +210,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, b, c], where=[OR(AND(<>(b, 1), <>(b, 3), <>(b, 4), <>(b, 5), <>(b, 6)), =(c, _UTF-16LE'xx':VARCHAR(65536) CHARACTER SET "UTF-16LE"))])
+Calc(select=[a, b, c], where=[OR(NOT IN(b, 1, 3, 4, 5, 6), =(c, _UTF-16LE'xx':VARCHAR(65536) CHARACTER SET "UTF-16LE"))])
 +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/agg/GroupingSetsTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/agg/GroupingSetsTest.xml
index ef039af..07fc423 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/agg/GroupingSetsTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/agg/GroupingSetsTest.xml
@@ -728,10 +728,10 @@ LogicalProject(c=[$0])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[c])
-+- SortLimit(orderBy=[$f2 ASC, c ASC], offset=[0], fetch=[200], global=[true])
++- SortLimit(orderBy=[EXPR$1 ASC, c ASC], offset=[0], fetch=[200], global=[true])
    +- Exchange(distribution=[single])
-      +- SortLimit(orderBy=[$f2 ASC, c ASC], offset=[0], fetch=[200], global=[false])
-         +- Calc(select=[deptno, c, CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT) AS $f2])
+      +- SortLimit(orderBy=[EXPR$1 ASC, c ASC], offset=[0], fetch=[200], global=[false])
+         +- Calc(select=[c, CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT) AS EXPR$1])
             +- HashAggregate(isMerge=[true], groupBy=[deptno, $e], select=[deptno, $e, Final_COUNT(count1$0) AS c])
                +- Exchange(distribution=[hash[deptno, $e]])
                   +- LocalHashAggregate(groupBy=[deptno, $e], select=[deptno, $e, Partial_COUNT(*) AS count1$0])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/ConvertToNotInOrInRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/ConvertToNotInOrInRuleTest.xml
new file mode 100644
index 0000000..cf8dae6
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/ConvertToNotInOrInRuleTest.xml
@@ -0,0 +1,437 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testConvertToIn_EqualsToThreshold_Int">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable WHERE a = 1 OR a = 2 OR a = 3 OR a = 4]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(=($0, 1), =($0, 2), =($0, 3), =($0, 4))])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[IN($0, 1, 2, 3, 4)])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testConvertToIn_GreaterThanThreshold_Double">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable WHERE d = 1 OR d = 2 OR d = 3 OR d = 4 OR d = 5 OR d = 6 OR d = 7 OR d = 8 OR d = 9 OR d = 10 OR d = 11 OR d = 12 OR d = 13 OR d = 14 OR d = 15 OR d = 16 OR d = 17 OR d = 18 OR d = 19 OR d = 20]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(=($3, 1), =($3, 2), =($3, 3), =($3, 4), =($3, 5), =($3, 6), =($3, 7), =($3, 8), =($3, 9), =($3, 10), =($3, 11), =($3, 12), =($3, 13), =($3, 14), =($3, 15), =($3, 16), =($3, 17), =($3, 18), =($3, 19), =($3, 20))])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[IN($3, 1:DOUBLE, 2:DOUBLE, 3:DOUBLE, 4:DOUBLE, 5:DOUBLE, 6:DOUBLE, 7:DOUBLE, 8:DOUBLE, 9:DOUBLE, 10:DOUBLE, 11:DOUBLE, 12:DOUBLE, 13:DOUBLE, 14:DOUBLE, 15:DOUBLE, 16:DOUBLE, 17:DOUBLE, 18:DOUBLE, 19:DOUBLE, 20:DOUBLE)])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testConvertToIn_GreaterThanThreshold_Int">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable WHERE a = 1 OR a = 2 OR a = 3 OR a = 4 OR a = 5]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(=($0, 1), =($0, 2), =($0, 3), =($0, 4), =($0, 5))])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[IN($0, 1, 2, 3, 4, 5)])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testConvertToIn_LessThanThreshold_Double">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable WHERE d = 1 OR d = 2 OR d = 3 OR d = 4 OR d = 5 OR d = 6 OR d = 7 OR d = 8 OR d = 9 OR d = 10 OR d = 11 OR d = 12 OR d = 13 OR d = 14 OR d = 15 OR d = 16 OR d = 17 OR d = 18 OR d = 19]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(=($3, 1), =($3, 2), =($3, 3), =($3, 4), =($3, 5), =($3, 6), =($3, 7), =($3, 8), =($3, 9), =($3, 10), =($3, 11), =($3, 12), =($3, 13), =($3, 14), =($3, 15), =($3, 16), =($3, 17), =($3, 18), =($3, 19))])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(=($3, 1), =($3, 2), =($3, 3), =($3, 4), =($3, 5), =($3, 6), =($3, 7), =($3, 8), =($3, 9), =($3, 10), =($3, 11), =($3, 12), =($3, 13), =($3, 14), =($3, 15), =($3, 16), =($3, 17), =($3, 18), =($3, 19))])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testConvertToIn_LessThanThreshold_Int">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable WHERE a = 1 OR a = 2 OR a = 3]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(=($0, 1), =($0, 2), =($0, 3))])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(=($0, 1), =($0, 2), =($0, 3))])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testConvertToIn_WithAnd1">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable WHERE (a = 1 OR a = 2 OR a = 3 OR a = 4) AND b = 1]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[AND(OR(=($0, 1), =($0, 2), =($0, 3), =($0, 4)), =($1, 1))])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[AND(IN($0, 1, 2, 3, 4), =($1, 1))])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testConvertToIn_WithAnd2">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable WHERE a = 1 OR a = 2 OR a = 3 OR a = 4 AND b = 1]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(=($0, 1), =($0, 2), =($0, 3), AND(=($0, 4), =($1, 1)))])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(=($0, 1), =($0, 2), =($0, 3), AND(=($0, 4), =($1, 1)))])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testConvertToIn_WithOr1">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable WHERE a = 1 OR a = 2 OR a = 3 OR a = 4 OR b = 1]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(=($0, 1), =($0, 2), =($0, 3), =($0, 4), =($1, 1))])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(=($1, 1), IN($0, 1, 2, 3, 4))])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testConvertToIn_WithOr2">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable WHERE a = 1 OR a = 2 OR b = 1 OR a = 3 OR a = 4]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(=($0, 1), =($0, 2), =($1, 1), =($0, 3), =($0, 4))])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(=($1, 1), IN($0, 1, 2, 3, 4))])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testConvertToInAndNotIn1">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable WHERE a = 1 OR a = 2 OR a = 3 OR a = 4 OR b = 1 OR (a <> 1 AND a <> 2 AND a <> 3 AND a <> 4)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(=($0, 1), =($0, 2), =($0, 3), =($0, 4), =($1, 1), AND(<>($0, 1), <>($0, 2), <>($0, 3), <>($0, 4)))])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(NOT IN($0, 1, 2, 3, 4), =($1, 1), IN($0, 1, 2, 3, 4))])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testConvertToInAndNotIn2">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable WHERE b = 1 OR a = 1 OR a = 2 OR a = 3 OR a = 4  AND (a <> 1 AND a <> 2 AND a <> 3 AND a <> 4)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(=($1, 1), =($0, 1), =($0, 2), =($0, 3), AND(=($0, 4), <>($0, 1), <>($0, 2), <>($0, 3), <>($0, 4)))])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(=($1, 1), =($0, 1), =($0, 2), =($0, 3), AND(=($0, 4), NOT IN($0, 1, 2, 3, 4)))])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testConvertToInAndNotIn3">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable WHERE b = 1 OR b = 2 OR (a <> 1 AND a <> 2 AND a <> 3 AND a <> 4 AND c = 1) OR b = 3 OR b = 4 OR c = 1]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(=($1, 1), =($1, 2), AND(<>($0, 1), <>($0, 2), <>($0, 3), <>($0, 4), =($2, 1)), =($1, 3), =($1, 4), =($2, 1))])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(AND(=($2, 1), NOT IN($0, 1, 2, 3, 4)), IN($1, 1:BIGINT, 2:BIGINT, 3:BIGINT, 4:BIGINT), =($2, 1))])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testConvertToNotIn_EqualsToThreshold_Int">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable WHERE a <> 1 AND a <> 2 AND a <> 3 AND a <> 4]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[AND(<>($0, 1), <>($0, 2), <>($0, 3), <>($0, 4))])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[NOT IN($0, 1, 2, 3, 4)])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testConvertToNotIn_GreaterThanThreshold_Double">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable WHERE d <> 1 AND d <> 2 AND d <> 3 AND d <> 4 AND d <> 5 AND d <> 6 AND d <> 7 AND d <> 8 AND d <> 9 AND d <> 10 AND d <> 11 AND d <> 12 AND d <> 13 AND d <> 14 AND d <> 15 AND d <> 16 AND d <> 17 AND d <> 18 AND d <> 19 AND d <> 20]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[AND(<>($3, 1), <>($3, 2), <>($3, 3), <>($3, 4), <>($3, 5), <>($3, 6), <>($3, 7), <>($3, 8), <>($3, 9), <>($3, 10), <>($3, 11), <>($3, 12), <>($3, 13), <>($3, 14), <>($3, 15), <>($3, 16), <>($3, 17), <>($3, 18), <>($3, 19), <>($3, 20))])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[NOT IN($3, 1:DOUBLE, 2:DOUBLE, 3:DOUBLE, 4:DOUBLE, 5:DOUBLE, 6:DOUBLE, 7:DOUBLE, 8:DOUBLE, 9:DOUBLE, 10:DOUBLE, 11:DOUBLE, 12:DOUBLE, 13:DOUBLE, 14:DOUBLE, 15:DOUBLE, 16:DOUBLE, 17:DOUBLE, 18:DOUBLE, 19:DOUBLE, 20:DOUBLE)])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testConvertToNotIn_GreaterThanThreshold_Int">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable WHERE a <> 1 AND a <> 2 AND a <> 3 AND a <> 4 AND a = 5]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[AND(<>($0, 1), <>($0, 2), <>($0, 3), <>($0, 4), =($0, 5))])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[AND(=($0, 5), NOT IN($0, 1, 2, 3, 4))])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testConvertToNotIn_LessThanThreshold_Double">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable WHERE d <> 1 AND d <> 2 AND d <> 3 AND d <> 4 AND d <> 5 AND d <> 6 AND d <> 7 AND d <> 8 AND d <> 9 AND d <> 10 AND d <> 11 AND d <> 12 AND d <> 13 AND d <> 14 AND d <> 15 AND d <> 16 AND d <> 17 AND d <> 18 AND d <> 19]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[AND(<>($3, 1), <>($3, 2), <>($3, 3), <>($3, 4), <>($3, 5), <>($3, 6), <>($3, 7), <>($3, 8), <>($3, 9), <>($3, 10), <>($3, 11), <>($3, 12), <>($3, 13), <>($3, 14), <>($3, 15), <>($3, 16), <>($3, 17), <>($3, 18), <>($3, 19))])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[AND(<>($3, 1), <>($3, 2), <>($3, 3), <>($3, 4), <>($3, 5), <>($3, 6), <>($3, 7), <>($3, 8), <>($3, 9), <>($3, 10), <>($3, 11), <>($3, 12), <>($3, 13), <>($3, 14), <>($3, 15), <>($3, 16), <>($3, 17), <>($3, 18), <>($3, 19))])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testConvertToNotIn_LessThanThreshold_Int">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable WHERE a <> 1 AND a <> 2 AND a <> 3]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[AND(<>($0, 1), <>($0, 2), <>($0, 3))])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[AND(<>($0, 1), <>($0, 2), <>($0, 3))])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testConvertToNotIn_WithAnd1">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable WHERE a <> 1 AND a <> 2 AND a <> 3 AND a <> 4 AND b = 1]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[AND(<>($0, 1), <>($0, 2), <>($0, 3), <>($0, 4), =($1, 1))])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[AND(=($1, 1), NOT IN($0, 1, 2, 3, 4))])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testConvertToNotIn_WithAnd2">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable WHERE a <> 1 AND a <> 2  AND b = 1 AND a <> 3 AND a <> 4]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[AND(<>($0, 1), <>($0, 2), =($1, 1), <>($0, 3), <>($0, 4))])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[AND(=($1, 1), NOT IN($0, 1, 2, 3, 4))])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testConvertToNotIn_WithOr1">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable WHERE (a <> 1 AND a <> 2 AND a <> 3 AND a <> 4) OR b = 1]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(AND(<>($0, 1), <>($0, 2), <>($0, 3), <>($0, 4)), =($1, 1))])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(NOT IN($0, 1, 2, 3, 4), =($1, 1))])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testConvertToNotIn_WithOr2">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable WHERE a <> 1 AND a <> 2 AND a <> 3 AND a <> 4 OR b = 1]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(AND(<>($0, 1), <>($0, 2), <>($0, 3), <>($0, 4)), =($1, 1))])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(NOT IN($0, 1, 2, 3, 4), =($1, 1))])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testConvertToNotIn_WithOr3">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable WHERE a <> 1 OR a <> 2 OR a <> 3 OR a <> 4 OR b = 1]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(<>($0, 1), <>($0, 2), <>($0, 3), <>($0, 4), =($1, 1))])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(<>($0, 1), <>($0, 2), <>($0, 3), <>($0, 4), =($1, 1))])
+   +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+  </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRuleTest.xml
index 7d2de86..3bbbd71 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRuleTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRuleTest.xml
@@ -54,8 +54,8 @@ LogicalProject(EXPR$0=[$1], EXPR$1=[$2])
     <Resource name="planAfter">
       <![CDATA[
 FlinkLogicalCalc(select=[EXPR$0, EXPR$1])
-+- FlinkLogicalAggregate(group=[{1}], EXPR$0=[SUM($0) FILTER $3], EXPR$1=[COUNT($2) FILTER $4])
-   +- FlinkLogicalCalc(select=[a, b, c, =(CASE(=($e, 1:BIGINT), 1:BIGINT, 4:BIGINT), 1) AS $g_1, =(CASE(=($e, 1:BIGINT), 1:BIGINT, 4:BIGINT), 4) AS $g_4])
++- FlinkLogicalAggregate(group=[{0}], EXPR$0=[SUM($1) FILTER $3], EXPR$1=[COUNT($2) FILTER $4])
+   +- FlinkLogicalCalc(select=[b, a, c, =(CASE(=($e, 1:BIGINT), 1:BIGINT, 2:BIGINT), 1) AS $g_1, =(CASE(=($e, 1:BIGINT), 1:BIGINT, 2:BIGINT), 2) AS $g_2])
       +- FlinkLogicalAggregate(group=[{0, 1, 2, 3}])
          +- FlinkLogicalExpand(projects=[{a=[$0], b=[$1], c=[null], $e=[1]}, {a=[null], b=[$1], c=[$2], $e=[4]}])
             +- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
@@ -99,8 +99,8 @@ LogicalProject(EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4])
     <Resource name="planAfter">
       <![CDATA[
 FlinkLogicalCalc(select=[EXPR$0, EXPR$1, EXPR$2, EXPR$3])
-+- FlinkLogicalAggregate(group=[{2}], EXPR$0=[COUNT($0) FILTER $5], EXPR$1=[SUM($1) FILTER $6], EXPR$2=[MIN($3) FILTER $7], EXPR$3=[MIN($4) FILTER $7])
-   +- FlinkLogicalCalc(select=[a, b, c, EXPR$2, EXPR$3, =(CASE(=($e, 2:BIGINT), 2:BIGINT, =($e, 4:BIGINT), 4:BIGINT, 6:BIGINT), 2) AS $g_2, =(CASE(=($e, 2:BIGINT), 2:BIGINT, =($e, 4:BIGINT), 4:BIGINT, 6:BIGINT), 4) AS $g_4, =(CASE(=($e, 2:BIGINT), 2:BIGINT, =($e, 4:BIGINT), 4:BIGINT, 6:BIGINT), 6) AS $g_6])
++- FlinkLogicalAggregate(group=[{0}], EXPR$0=[COUNT($1) FILTER $5], EXPR$1=[SUM($2) FILTER $6], EXPR$2=[MIN($3) FILTER $7], EXPR$3=[MIN($4) FILTER $7])
+   +- FlinkLogicalCalc(select=[c, a, b, EXPR$2, EXPR$3, =(CASE(=($e, 2:BIGINT), 1:BIGINT, =($e, 4:BIGINT), 2:BIGINT, 3:BIGINT), 1) AS $g_1, =(CASE(=($e, 2:BIGINT), 1:BIGINT, =($e, 4:BIGINT), 2:BIGINT, 3:BIGINT), 2) AS $g_2, =(CASE(=($e, 2:BIGINT), 1:BIGINT, =($e, 4:BIGINT), 2:BIGINT, 3:BIGINT), 3) AS $g_3])
       +- FlinkLogicalAggregate(group=[{0, 1, 2, 3}], EXPR$2=[MAX($4)], EXPR$3=[MIN($4)])
          +- FlinkLogicalExpand(projects=[{a=[$0], b=[null], c=[$2], $e=[2], a_0=[$0]}, {a=[null], b=[$1], c=[$2], $e=[4], a_0=[$0]}, {a=[null], b=[null], c=[$2], $e=[6], a_0=[$0]}])
             +- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/FlinkCalcMergeRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/FlinkCalcMergeRuleTest.xml
new file mode 100644
index 0000000..51bca73
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/FlinkCalcMergeRuleTest.xml
@@ -0,0 +1,80 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testCalcMergeWithSameDigest">
+    <Resource name="sql">
+      <![CDATA[SELECT a, b FROM (SELECT * FROM MyTable WHERE a = b) t WHERE b = a]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalFilter(condition=[=($1, $0)])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2])
+      +- LogicalFilter(condition=[=($0, $1)])
+         +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+FlinkLogicalCalc(select=[a, b], where=[=(a, b)])
++- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCalcMergeWithNonDeterministicExpr1">
+    <Resource name="sql">
+      <![CDATA[SELECT a, a1 FROM (SELECT a, random_udf(a) AS a1 FROM MyTable) t WHERE a1 > 10]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], a1=[$1])
++- LogicalFilter(condition=[>($1, 10)])
+   +- LogicalProject(a=[$0], a1=[random_udf($0)])
+      +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+FlinkLogicalCalc(select=[a, a1], where=[>(a1, 10)])
++- FlinkLogicalCalc(select=[a, random_udf(a) AS a1])
+   +- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCalcMergeWithNonDeterministicExpr2">
+    <Resource name="sql">
+      <![CDATA[SELECT a FROM (SELECT a FROM MyTable) t WHERE random_udf(a) > 10]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0])
++- LogicalFilter(condition=[>(random_udf($0), 10)])
+   +- LogicalProject(a=[$0])
+      +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+FlinkLogicalCalc(select=[a])
++- FlinkLogicalCalc(select=[a], where=[>(random_udf(a), 10)])
+   +- FlinkLogicalCalc(select=[a])
+      +- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/RewriteCoalesceRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/RewriteCoalesceRuleTest.xml
new file mode 100644
index 0000000..8737c22
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/RewriteCoalesceRuleTest.xml
@@ -0,0 +1,164 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testCalcite1018">
+    <Resource name="sql">
+      <![CDATA[
+select * from (select * from scott_emp) e left join (
+    select * from scott_dept d) using (deptno)
+    order by empno limit 10
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[10])
++- LogicalProject(deptno=[COALESCE($7, $8)], empno=[$0], ename=[$1], job=[$2], mgr=[$3], hiredate=[$4], sal=[$5], comm=[$6], dname=[$9], loc=[$10])
+   +- LogicalJoin(condition=[=($7, $8)], joinType=[left])
+      :- LogicalProject(empno=[$0], ename=[$1], job=[$2], mgr=[$3], hiredate=[$4], sal=[$5], comm=[$6], deptno=[$7])
+      :  +- LogicalTableScan(table=[[scott_emp, source: [TestTableSource(empno, ename, job, mgr, hiredate, sal, comm, deptno)]]])
+      +- LogicalProject(deptno=[$0], dname=[$1], loc=[$2])
+         +- LogicalTableScan(table=[[scott_dept, source: [TestTableSource(deptno, dname, loc)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[10])
++- LogicalProject($f0=[CASE(IS NOT NULL($7), $7, $8)], empno=[$0], ename=[$1], job=[$2], mgr=[$3], hiredate=[$4], sal=[$5], comm=[$6], dname=[$9], loc=[$10])
+   +- LogicalJoin(condition=[=($7, $8)], joinType=[left])
+      :- LogicalProject(empno=[$0], ename=[$1], job=[$2], mgr=[$3], hiredate=[$4], sal=[$5], comm=[$6], deptno=[$7])
+      :  +- LogicalTableScan(table=[[scott_emp, source: [TestTableSource(empno, ename, job, mgr, hiredate, sal, comm, deptno)]]])
+      +- LogicalProject(deptno=[$0], dname=[$1], loc=[$2])
+         +- LogicalTableScan(table=[[scott_dept, source: [TestTableSource(deptno, dname, loc)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCoalesceConstantReduce">
+    <Resource name="sql">
+      <![CDATA[
+select * from lateral (select * from scott_emp) as e
+    join (table scott_dept) using (deptno)
+    where e.deptno = 10
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(deptno=[COALESCE($7, $8)], empno=[$0], ename=[$1], job=[$2], mgr=[$3], hiredate=[$4], sal=[$5], comm=[$6], dname=[$9], loc=[$10])
++- LogicalFilter(condition=[=($7, 10)])
+   +- LogicalJoin(condition=[=($7, $8)], joinType=[inner])
+      :- LogicalProject(empno=[$0], ename=[$1], job=[$2], mgr=[$3], hiredate=[$4], sal=[$5], comm=[$6], deptno=[$7])
+      :  +- LogicalTableScan(table=[[scott_emp, source: [TestTableSource(empno, ename, job, mgr, hiredate, sal, comm, deptno)]]])
+      +- LogicalProject(deptno=[$0], dname=[$1], loc=[$2])
+         +- LogicalTableScan(table=[[scott_dept, source: [TestTableSource(deptno, dname, loc)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject($f0=[CAST(10):INTEGER], empno=[$0], ename=[$1], job=[$2], mgr=[$3], hiredate=[$4], sal=[$5], comm=[$6], dname=[$9], loc=[$10])
++- LogicalFilter(condition=[=($7, 10)])
+   +- LogicalJoin(condition=[=($7, $8)], joinType=[inner])
+      :- LogicalProject(empno=[$0], ename=[$1], job=[$2], mgr=[$3], hiredate=[$4], sal=[$5], comm=[$6], deptno=[$7])
+      :  +- LogicalTableScan(table=[[scott_emp, source: [TestTableSource(empno, ename, job, mgr, hiredate, sal, comm, deptno)]]])
+      +- LogicalProject(deptno=[$0], dname=[$1], loc=[$2])
+         +- LogicalTableScan(table=[[scott_dept, source: [TestTableSource(deptno, dname, loc)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testNaturalJoinLeftOuter">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM scott_dept
+    natural left join scott_emp
+    order by scott_dept.deptno, scott_emp.deptno
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(deptno=[$0], dname=[$1], loc=[$2], empno=[$3], ename=[$4], job=[$5], mgr=[$6], hiredate=[$7], sal=[$8], comm=[$9])
++- LogicalSort(sort0=[$10], sort1=[$11], dir0=[ASC-nulls-first], dir1=[ASC-nulls-first])
+   +- LogicalProject(deptno=[COALESCE($0, $10)], dname=[$1], loc=[$2], empno=[$3], ename=[$4], job=[$5], mgr=[$6], hiredate=[$7], sal=[$8], comm=[$9], deptno0=[$0], deptno1=[$10])
+      +- LogicalJoin(condition=[=($0, $10)], joinType=[left])
+         :- LogicalTableScan(table=[[scott_dept, source: [TestTableSource(deptno, dname, loc)]]])
+         +- LogicalTableScan(table=[[scott_emp, source: [TestTableSource(empno, ename, job, mgr, hiredate, sal, comm, deptno)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(deptno=[$0], dname=[$1], loc=[$2], empno=[$3], ename=[$4], job=[$5], mgr=[$6], hiredate=[$7], sal=[$8], comm=[$9])
++- LogicalSort(sort0=[$10], sort1=[$11], dir0=[ASC-nulls-first], dir1=[ASC-nulls-first])
+   +- LogicalProject($f0=[CASE(IS NOT NULL($0), $0, $10)], dname=[$1], loc=[$2], empno=[$3], ename=[$4], job=[$5], mgr=[$6], hiredate=[$7], sal=[$8], comm=[$9], deptno=[$0], deptno0=[$10])
+      +- LogicalJoin(condition=[=($0, $10)], joinType=[left])
+         :- LogicalTableScan(table=[[scott_dept, source: [TestTableSource(deptno, dname, loc)]]])
+         +- LogicalTableScan(table=[[scott_emp, source: [TestTableSource(empno, ename, job, mgr, hiredate, sal, comm, deptno)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testNaturalJoinRightOuter">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM scott_dept
+    natural right join scott_emp
+    order by scott_dept.deptno, scott_emp.deptno
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(deptno=[$0], dname=[$1], loc=[$2], empno=[$3], ename=[$4], job=[$5], mgr=[$6], hiredate=[$7], sal=[$8], comm=[$9])
++- LogicalSort(sort0=[$10], sort1=[$11], dir0=[ASC-nulls-first], dir1=[ASC-nulls-first])
+   +- LogicalProject(deptno=[COALESCE($0, $10)], dname=[$1], loc=[$2], empno=[$3], ename=[$4], job=[$5], mgr=[$6], hiredate=[$7], sal=[$8], comm=[$9], deptno0=[$0], deptno1=[$10])
+      +- LogicalJoin(condition=[=($0, $10)], joinType=[right])
+         :- LogicalTableScan(table=[[scott_dept, source: [TestTableSource(deptno, dname, loc)]]])
+         +- LogicalTableScan(table=[[scott_emp, source: [TestTableSource(empno, ename, job, mgr, hiredate, sal, comm, deptno)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject(deptno=[$0], dname=[$1], loc=[$2], empno=[$3], ename=[$4], job=[$5], mgr=[$6], hiredate=[$7], sal=[$8], comm=[$9])
++- LogicalSort(sort0=[$10], sort1=[$11], dir0=[ASC-nulls-first], dir1=[ASC-nulls-first])
+   +- LogicalProject($f0=[CASE(IS NOT NULL($0), $0, $10)], dname=[$1], loc=[$2], empno=[$3], ename=[$4], job=[$5], mgr=[$6], hiredate=[$7], sal=[$8], comm=[$9], deptno=[$0], deptno0=[$10])
+      +- LogicalJoin(condition=[=($0, $10)], joinType=[right])
+         :- LogicalTableScan(table=[[scott_dept, source: [TestTableSource(deptno, dname, loc)]]])
+         +- LogicalTableScan(table=[[scott_emp, source: [TestTableSource(empno, ename, job, mgr, hiredate, sal, comm, deptno)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testNaturalJoinWithPredicates">
+    <Resource name="sql">
+      <![CDATA[
+select * from scott_dept natural join scott_emp where empno = 1
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(deptno=[COALESCE($0, $10)], dname=[$1], loc=[$2], empno=[$3], ename=[$4], job=[$5], mgr=[$6], hiredate=[$7], sal=[$8], comm=[$9])
++- LogicalFilter(condition=[=($3, 1)])
+   +- LogicalJoin(condition=[=($0, $10)], joinType=[inner])
+      :- LogicalTableScan(table=[[scott_dept, source: [TestTableSource(deptno, dname, loc)]]])
+      +- LogicalTableScan(table=[[scott_emp, source: [TestTableSource(empno, ename, job, mgr, hiredate, sal, comm, deptno)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LogicalProject($f0=[CASE(IS NOT NULL($0), $0, $10)], dname=[$1], loc=[$2], empno=[CAST(1):INTEGER], ename=[$4], job=[$5], mgr=[$6], hiredate=[$7], sal=[$8], comm=[$9])
++- LogicalFilter(condition=[=($3, 1)])
+   +- LogicalJoin(condition=[=($0, $10)], joinType=[inner])
+      :- LogicalTableScan(table=[[scott_dept, source: [TestTableSource(deptno, dname, loc)]]])
+      +- LogicalTableScan(table=[[scott_emp, source: [TestTableSource(empno, ename, job, mgr, hiredate, sal, comm, deptno)]]])
+]]>
+    </Resource>
+  </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/SplitAggregateRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/SplitAggregateRuleTest.xml
index 87b17ae..78066af 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/SplitAggregateRuleTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/SplitAggregateRuleTest.xml
@@ -41,9 +41,8 @@ FlinkLogicalAggregate(group=[{0}], agg#0=[$SUM0($2)], agg#1=[SUM($3)], agg#2=[SU
 +- FlinkLogicalAggregate(group=[{0, 4}], agg#0=[COUNT(DISTINCT $1) FILTER $5], agg#1=[SUM($1) FILTER $6], agg#2=[SUM($1) FILTER $7])
    +- FlinkLogicalCalc(select=[a, b, $f2, $f3, $f4, AND(=($e, 0), $f2) AS $g_0, AND(=($e, 1), $f3) AS $g_1, AND(=($e, 1), $f2) AS $g_10])
       +- FlinkLogicalExpand(projects=[{a=[$0], b=[$1], $f2=[$2], $f3=[$3], $f4=[$4], $e=[0]}, {a=[$0], b=[$1], $f2=[$2], $f3=[$3], $f4=[null], $e=[1]}])
-         +- FlinkLogicalCalc(select=[a, b, $f2, $f3, MOD(HASH_CODE(b), 1024) AS $f4])
-            +- FlinkLogicalCalc(select=[a, b, IS TRUE(<>(b, 2)) AS $f2, IS TRUE(<>(b, 5)) AS $f3])
-               +- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+         +- FlinkLogicalCalc(select=[a, b, IS TRUE(<>(b, 2)) AS $f2, IS TRUE(<>(b, 5)) AS $f3, MOD(HASH_CODE(b), 1024) AS $f4])
+            +- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -90,8 +89,7 @@ FlinkLogicalJoin(condition=[=($1, $4)], joinType=[inner])
 :                       +- FlinkLogicalCalc(select=[a, b, $f2, =($e, 0) AS $g_0, =($e, 1) AS $g_1])
 :                          +- FlinkLogicalExpand(projects=[{a=[$0], b=[$1], $f2=[$2], $e=[0]}, {a=[$0], b=[$1], $f2=[null], $e=[1]}])
 :                             +- FlinkLogicalCalc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2])
-:                                +- FlinkLogicalCalc(select=[a, b])
-:                                   +- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+:                                +- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 +- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -112,8 +110,7 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)])
 FlinkLogicalAggregate(group=[{}], agg#0=[$SUM0($1)])
 +- FlinkLogicalAggregate(group=[{1}], agg#0=[COUNT(DISTINCT $0)])
    +- FlinkLogicalCalc(select=[c, MOD(HASH_CODE(c), 100) AS $f1])
-      +- FlinkLogicalCalc(select=[c])
-         +- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+      +- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -135,8 +132,7 @@ FlinkLogicalAggregate(group=[{}], agg#0=[$SUM0($2)], agg#1=[SUM($3)])
    +- FlinkLogicalCalc(select=[a, b, $f2, $f3, =($e, 1) AS $g_1, =($e, 2) AS $g_2])
       +- FlinkLogicalExpand(projects=[{a=[$0], b=[$1], $f2=[$2], $f3=[null], $e=[1]}, {a=[$0], b=[$1], $f2=[null], $f3=[$3], $e=[2]}])
          +- FlinkLogicalCalc(select=[a, b, MOD(HASH_CODE(a), 1024) AS $f2, MOD(HASH_CODE(b), 1024) AS $f3])
-            +- FlinkLogicalCalc(select=[a, b])
-               +- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+            +- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -211,8 +207,7 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)])
 FlinkLogicalAggregate(group=[{}], agg#0=[$SUM0($1)])
 +- FlinkLogicalAggregate(group=[{1}], agg#0=[COUNT(DISTINCT $0)])
    +- FlinkLogicalCalc(select=[c, MOD(HASH_CODE(c), 1024) AS $f1])
-      +- FlinkLogicalCalc(select=[c])
-         +- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+      +- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -256,15 +251,13 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[SUM($1)], EXP
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-FlinkLogicalCalc(select=[a, $f1, $f2, CAST($f3) AS $f3])
-+- FlinkLogicalCalc(select=[a, $f1, $f2, /($f3, $f4) AS $f3])
-   +- FlinkLogicalAggregate(group=[{0}], agg#0=[$SUM0($2)], agg#1=[SUM($3)], agg#2=[$SUM0($4)], agg#3=[$SUM0($5)])
-      +- FlinkLogicalAggregate(group=[{0, 2}], agg#0=[COUNT(DISTINCT $1) FILTER $3], agg#1=[SUM($1) FILTER $4], agg#2=[$SUM0($1) FILTER $4], agg#3=[COUNT($1) FILTER $4])
-         +- FlinkLogicalCalc(select=[a, b, $f2, =($e, 0) AS $g_0, =($e, 1) AS $g_1])
-            +- FlinkLogicalExpand(projects=[{a=[$0], b=[$1], $f2=[$2], $e=[0]}, {a=[$0], b=[$1], $f2=[null], $e=[1]}])
-               +- FlinkLogicalCalc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2])
-                  +- FlinkLogicalCalc(select=[a, b])
-                     +- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+FlinkLogicalCalc(select=[a, $f1, $f2, CAST(/($f3, $f4)) AS $f3])
++- FlinkLogicalAggregate(group=[{0}], agg#0=[$SUM0($2)], agg#1=[SUM($3)], agg#2=[$SUM0($4)], agg#3=[$SUM0($5)])
+   +- FlinkLogicalAggregate(group=[{0, 2}], agg#0=[COUNT(DISTINCT $1) FILTER $3], agg#1=[SUM($1) FILTER $4], agg#2=[$SUM0($1) FILTER $4], agg#3=[COUNT($1) FILTER $4])
+      +- FlinkLogicalCalc(select=[a, b, $f2, =($e, 0) AS $g_0, =($e, 1) AS $g_1])
+         +- FlinkLogicalExpand(projects=[{a=[$0], b=[$1], $f2=[$2], $e=[0]}, {a=[$0], b=[$1], $f2=[null], $e=[1]}])
+            +- FlinkLogicalCalc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2])
+               +- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -284,8 +277,7 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)])
 FlinkLogicalAggregate(group=[{0}], agg#0=[$SUM0($2)])
 +- FlinkLogicalAggregate(group=[{0, 2}], agg#0=[COUNT(DISTINCT $1)])
    +- FlinkLogicalCalc(select=[a, c, MOD(HASH_CODE(c), 1024) AS $f2])
-      +- FlinkLogicalCalc(select=[a, c])
-         +- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+      +- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/CalcTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/CalcTest.xml
index 48faff1..d22ee5e 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/CalcTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/CalcTest.xml
@@ -103,7 +103,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, b, CAST(_UTF-16LE'xx':VARCHAR(65536) CHARACTER SET "UTF-16LE") AS c], where=[AND(OR(=(b, 1), =(b, 3), =(b, 4), =(b, 5), =(b, 6)), =(c, _UTF-16LE'xx':VARCHAR(65536) CHARACTER SET "UTF-16LE"))])
+Calc(select=[a, b, CAST(_UTF-16LE'xx':VARCHAR(65536) CHARACTER SET "UTF-16LE") AS c], where=[AND(IN(b, 1, 3, 4, 5, 6), =(c, _UTF-16LE'xx':VARCHAR(65536) CHARACTER SET "UTF-16LE"))])
 +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -210,7 +210,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, b, c], where=[OR(AND(<>(b, 1), <>(b, 3), <>(b, 4), <>(b, 5), <>(b, 6)), =(c, _UTF-16LE'xx':VARCHAR(65536) CHARACTER SET "UTF-16LE"))])
+Calc(select=[a, b, c], where=[OR(NOT IN(b, 1, 3, 4, 5, 6), =(c, _UTF-16LE'xx':VARCHAR(65536) CHARACTER SET "UTF-16LE"))])
 +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/DistinctAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/DistinctAggregateTest.xml
index 728c121..4d869c3 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/DistinctAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/DistinctAggregateTest.xml
@@ -100,9 +100,8 @@ GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($
       +- Exchange(distribution=[hash[a, $f4]])
          +- Calc(select=[a, b, $f2, $f3, $f4, AND(=($e, 0), $f2) AS $g_0, AND(=($e, 1), $f3) AS $g_1, AND(=($e, 1), $f2) AS $g_10])
             +- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $f3=[$3], $f4=[$4], $e=[0]}, {a=[$0], b=[$1], $f2=[$2], $f3=[$3], $f4=[null], $e=[1]}])
-               +- Calc(select=[a, b, $f2, $f3, MOD(HASH_CODE(b), 1024) AS $f4])
-                  +- Calc(select=[a, b, IS TRUE(<>(b, 2:BIGINT)) AS $f2, IS TRUE(<>(b, 5:BIGINT)) AS $f3])
-                     +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+               +- Calc(select=[a, b, IS TRUE(<>(b, 2:BIGINT)) AS $f2, IS TRUE(<>(b, 5:BIGINT)) AS $f3, MOD(HASH_CODE(b), 1024) AS $f4])
+                  +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -135,9 +134,8 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RET
             +- LocalGroupAggregate(groupBy=[a, $f4], partialFinalType=[PARTIAL], select=[a, $f4, COUNT(distinct$0 b) FILTER $g_0 AS count$0, SUM(b) FILTER $g_1 AS sum$1, SUM(b) FILTER $g_10 AS sum$2, DISTINCT(b) AS distinct$0])
                +- Calc(select=[a, b, $f2, $f3, $f4, AND(=($e, 0), $f2) AS $g_0, AND(=($e, 1), $f3) AS $g_1, AND(=($e, 1), $f2) AS $g_10])
                   +- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $f3=[$3], $f4=[$4], $e=[0]}, {a=[$0], b=[$1], $f2=[$2], $f3=[$3], $f4=[null], $e=[1]}])
-                     +- Calc(select=[a, b, $f2, $f3, MOD(HASH_CODE(b), 1024) AS $f4])
-                        +- Calc(select=[a, b, IS TRUE(<>(b, 2:BIGINT)) AS $f2, IS TRUE(<>(b, 5:BIGINT)) AS $f3])
-                           +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+                     +- Calc(select=[a, b, IS TRUE(<>(b, 2:BIGINT)) AS $f2, IS TRUE(<>(b, 5:BIGINT)) AS $f3, MOD(HASH_CODE(b), 1024) AS $f4])
+                        +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -239,14 +237,12 @@ GroupAggregate(groupBy=[b], partialFinalType=[FINAL], select=[b, FIRST_VALUE_RET
    +- GroupAggregate(groupBy=[b, $f2], partialFinalType=[PARTIAL], select=[b, $f2, FIRST_VALUE_RETRACT(c) AS $f2_0, LAST_VALUE_RETRACT(c) AS $f3, COUNT_RETRACT(DISTINCT c) AS $f4], updateAsRetraction=[true], accMode=[AccRetract])
       +- Exchange(distribution=[hash[b, $f2]], updateAsRetraction=[true], accMode=[AccRetract])
          +- Calc(select=[b, c, MOD(HASH_CODE(c), 1024) AS $f2], updateAsRetraction=[true], accMode=[AccRetract])
-            +- Calc(select=[b, c], updateAsRetraction=[true], accMode=[AccRetract])
-               +- GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($f2_0) AS $f1, MAX($f3) AS $f2], updateAsRetraction=[true], accMode=[AccRetract])
-                  +- Exchange(distribution=[hash[a]], updateAsRetraction=[true], accMode=[AccRetract])
-                     +- GroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(DISTINCT b) AS $f2_0, MAX(b) AS $f3], updateAsRetraction=[true], accMode=[AccRetract])
-                        +- Exchange(distribution=[hash[a, $f2]], updateAsRetraction=[true], accMode=[Acc])
-                           +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2], updateAsRetraction=[true], accMode=[Acc])
-                              +- Calc(select=[a, b], updateAsRetraction=[true], accMode=[Acc])
-                                 +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
+            +- GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($f2_0) AS $f1, MAX($f3) AS $f2], updateAsRetraction=[true], accMode=[AccRetract])
+               +- Exchange(distribution=[hash[a]], updateAsRetraction=[true], accMode=[AccRetract])
+                  +- GroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(DISTINCT b) AS $f2_0, MAX(b) AS $f3], updateAsRetraction=[true], accMode=[AccRetract])
+                     +- Exchange(distribution=[hash[a, $f2]], updateAsRetraction=[true], accMode=[Acc])
+                        +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2], updateAsRetraction=[true], accMode=[Acc])
+                           +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
 ]]>
     </Resource>
   </TestCase>
@@ -279,16 +275,14 @@ GroupAggregate(groupBy=[b], partialFinalType=[FINAL], select=[b, FIRST_VALUE_RET
    +- GroupAggregate(groupBy=[b, $f2], partialFinalType=[PARTIAL], select=[b, $f2, FIRST_VALUE_RETRACT(c) AS $f2_0, LAST_VALUE_RETRACT(c) AS $f3, COUNT_RETRACT(DISTINCT c) AS $f4], updateAsRetraction=[true], accMode=[AccRetract])
       +- Exchange(distribution=[hash[b, $f2]], updateAsRetraction=[true], accMode=[AccRetract])
          +- Calc(select=[b, c, MOD(HASH_CODE(c), 1024) AS $f2], updateAsRetraction=[true], accMode=[AccRetract])
-            +- Calc(select=[b, c], updateAsRetraction=[true], accMode=[AccRetract])
-               +- GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT(sum$0) AS $f1, MAX(max$1) AS $f2], updateAsRetraction=[true], accMode=[AccRetract])
-                  +- Exchange(distribution=[hash[a]], updateAsRetraction=[true], accMode=[Acc])
-                     +- LocalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($f2_0) AS sum$0, MAX($f3) AS max$1, COUNT_RETRACT(*) AS count1$2], updateAsRetraction=[true], accMode=[Acc])
-                        +- GlobalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 count$0) AS $f2_0, MAX(max$1) AS $f3], updateAsRetraction=[true], accMode=[AccRetract])
-                           +- Exchange(distribution=[hash[a, $f2]], updateAsRetraction=[true], accMode=[Acc])
-                              +- LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 b) AS count$0, MAX(b) AS max$1, DISTINCT(b) AS distinct$0], updateAsRetraction=[true], accMode=[Acc])
-                                 +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2], updateAsRetraction=[true], accMode=[Acc])
-                                    +- Calc(select=[a, b], updateAsRetraction=[true], accMode=[Acc])
-                                       +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
+            +- GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT(sum$0) AS $f1, MAX(max$1) AS $f2], updateAsRetraction=[true], accMode=[AccRetract])
+               +- Exchange(distribution=[hash[a]], updateAsRetraction=[true], accMode=[Acc])
+                  +- LocalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($f2_0) AS sum$0, MAX($f3) AS max$1, COUNT_RETRACT(*) AS count1$2], updateAsRetraction=[true], accMode=[Acc])
+                     +- GlobalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 count$0) AS $f2_0, MAX(max$1) AS $f3], updateAsRetraction=[true], accMode=[AccRetract])
+                        +- Exchange(distribution=[hash[a, $f2]], updateAsRetraction=[true], accMode=[Acc])
+                           +- LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 b) AS count$0, MAX(b) AS max$1, DISTINCT(b) AS distinct$0], updateAsRetraction=[true], accMode=[Acc])
+                              +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2], updateAsRetraction=[true], accMode=[Acc])
+                                 +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
 ]]>
     </Resource>
   </TestCase>
@@ -492,8 +486,7 @@ GroupAggregate(partialFinalType=[FINAL], select=[$SUM0_RETRACT($f2_0) AS $f0, SU
          +- Calc(select=[a, b, $f2, $f3, =($e, 1) AS $g_1, =($e, 2) AS $g_2])
             +- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $f3=[null], $e=[1]}, {a=[$0], b=[$1], $f2=[null], $f3=[$3], $e=[2]}])
                +- Calc(select=[a, b, MOD(HASH_CODE(a), 1024) AS $f2, MOD(HASH_CODE(b), 1024) AS $f3])
-                  +- Calc(select=[a, b])
-                     +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+                  +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -519,8 +512,7 @@ GlobalGroupAggregate(partialFinalType=[FINAL], select=[$SUM0_RETRACT(sum$0) AS $
                +- Calc(select=[a, b, $f2, $f3, =($e, 1) AS $g_1, =($e, 2) AS $g_2])
                   +- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $f3=[null], $e=[1]}, {a=[$0], b=[$1], $f2=[null], $f3=[$3], $e=[2]}])
                      +- Calc(select=[a, b, MOD(HASH_CODE(a), 1024) AS $f2, MOD(HASH_CODE(b), 1024) AS $f3])
-                        +- Calc(select=[a, b])
-                           +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+                        +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -811,8 +803,7 @@ GroupAggregate(partialFinalType=[FINAL], select=[$SUM0_RETRACT($f1_0) AS $f0])
    +- GroupAggregate(groupBy=[$f1], partialFinalType=[PARTIAL], select=[$f1, COUNT(DISTINCT c) AS $f1_0])
       +- Exchange(distribution=[hash[$f1]])
          +- Calc(select=[c, MOD(HASH_CODE(c), 1024) AS $f1])
-            +- Calc(select=[c])
-               +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+            +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -836,8 +827,7 @@ GlobalGroupAggregate(partialFinalType=[FINAL], select=[$SUM0_RETRACT(sum$0) AS $
          +- Exchange(distribution=[hash[$f1]])
             +- LocalGroupAggregate(groupBy=[$f1], partialFinalType=[PARTIAL], select=[$f1, COUNT(distinct$0 c) AS count$0, DISTINCT(c) AS distinct$0])
                +- Calc(select=[c, MOD(HASH_CODE(c), 1024) AS $f1])
-                  +- Calc(select=[c])
-                     +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+                  +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -1004,17 +994,15 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[SUM($1)], EXP
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, $f1, $f2, CAST($f3) AS $f3])
-+- Calc(select=[a, $f1, $f2, /($f3, $f4) AS $f3])
-   +- GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($f2_0) AS $f1, SUM_RETRACT($f3) AS $f2, $SUM0_RETRACT($f4) AS $f3, $SUM0_RETRACT($f5) AS $f4])
-      +- Exchange(distribution=[hash[a]])
-         +- GroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(DISTINCT b) FILTER $g_0 AS $f2_0, SUM(b) FILTER $g_1 AS $f3, $SUM0(b) FILTER $g_1 AS $f4, COUNT(b) FILTER $g_1 AS $f5])
-            +- Exchange(distribution=[hash[a, $f2]])
-               +- Calc(select=[a, b, $f2, =($e, 0) AS $g_0, =($e, 1) AS $g_1])
-                  +- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $e=[0]}, {a=[$0], b=[$1], $f2=[null], $e=[1]}])
-                     +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2])
-                        +- Calc(select=[a, b])
-                           +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+Calc(select=[a, $f1, $f2, CAST(/($f3, $f4)) AS $f3])
++- GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($f2_0) AS $f1, SUM_RETRACT($f3) AS $f2, $SUM0_RETRACT($f4) AS $f3, $SUM0_RETRACT($f5) AS $f4])
+   +- Exchange(distribution=[hash[a]])
+      +- GroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(DISTINCT b) FILTER $g_0 AS $f2_0, SUM(b) FILTER $g_1 AS $f3, $SUM0(b) FILTER $g_1 AS $f4, COUNT(b) FILTER $g_1 AS $f5])
+         +- Exchange(distribution=[hash[a, $f2]])
+            +- Calc(select=[a, b, $f2, =($e, 0) AS $g_0, =($e, 1) AS $g_1])
+               +- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $e=[0]}, {a=[$0], b=[$1], $f2=[null], $e=[1]}])
+                  +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2])
+                     +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -1031,19 +1019,17 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[SUM($1)], EXP
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, $f1, $f2, CAST($f3) AS $f3])
-+- Calc(select=[a, $f1, $f2, /($f3, $f4) AS $f3])
-   +- GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT(sum$0) AS $f1, SUM_RETRACT((sum$1, count$2)) AS $f2, $SUM0_RETRACT(sum$3) AS $f3, $SUM0_RETRACT(sum$4) AS $f4])
-      +- Exchange(distribution=[hash[a]])
-         +- LocalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($f2_0) AS sum$0, SUM_RETRACT($f3) AS (sum$1, count$2), $SUM0_RETRACT($f4) AS sum$3, $SUM0_RETRACT($f5) AS sum$4, COUNT_RETRACT(*) AS count1$5])
-            +- GlobalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 count$0) AS $f2_0, SUM(sum$1) AS $f3, $SUM0(sum$2) AS $f4, COUNT(count$3) AS $f5])
-               +- Exchange(distribution=[hash[a, $f2]])
-                  +- LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 b) FILTER $g_0 AS count$0, SUM(b) FILTER $g_1 AS sum$1, $SUM0(b) FILTER $g_1 AS sum$2, COUNT(b) FILTER $g_1 AS count$3, DISTINCT(b) AS distinct$0])
-                     +- Calc(select=[a, b, $f2, =($e, 0) AS $g_0, =($e, 1) AS $g_1])
-                        +- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $e=[0]}, {a=[$0], b=[$1], $f2=[null], $e=[1]}])
-                           +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2])
-                              +- Calc(select=[a, b])
-                                 +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+Calc(select=[a, $f1, $f2, CAST(/($f3, $f4)) AS $f3])
++- GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT(sum$0) AS $f1, SUM_RETRACT((sum$1, count$2)) AS $f2, $SUM0_RETRACT(sum$3) AS $f3, $SUM0_RETRACT(sum$4) AS $f4])
+   +- Exchange(distribution=[hash[a]])
+      +- LocalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($f2_0) AS sum$0, SUM_RETRACT($f3) AS (sum$1, count$2), $SUM0_RETRACT($f4) AS sum$3, $SUM0_RETRACT($f5) AS sum$4, COUNT_RETRACT(*) AS count1$5])
+         +- GlobalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 count$0) AS $f2_0, SUM(sum$1) AS $f3, $SUM0(sum$2) AS $f4, COUNT(count$3) AS $f5])
+            +- Exchange(distribution=[hash[a, $f2]])
+               +- LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 b) FILTER $g_0 AS count$0, SUM(b) FILTER $g_1 AS sum$1, $SUM0(b) FILTER $g_1 AS sum$2, COUNT(b) FILTER $g_1 AS count$3, DISTINCT(b) AS distinct$0])
+                  +- Calc(select=[a, b, $f2, =($e, 0) AS $g_0, =($e, 1) AS $g_1])
+                     +- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $e=[0]}, {a=[$0], b=[$1], $f2=[null], $e=[1]}])
+                        +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2])
+                           +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -1106,8 +1092,7 @@ GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($
    +- GroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(DISTINCT c) AS $f2_0])
       +- Exchange(distribution=[hash[a, $f2]])
          +- Calc(select=[a, c, MOD(HASH_CODE(c), 1024) AS $f2])
-            +- Calc(select=[a, c])
-               +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+            +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -1131,8 +1116,7 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RET
          +- Exchange(distribution=[hash[a, $f2]])
             +- LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 c) AS count$0, DISTINCT(c) AS distinct$0])
                +- Calc(select=[a, c, MOD(HASH_CODE(c), 1024) AS $f2])
-                  +- Calc(select=[a, c])
-                     +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+                  +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -1228,11 +1212,10 @@ GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($
       +- Exchange(distribution=[hash[a, $f3]], updateAsRetraction=[true], accMode=[AccRetract])
          +- Calc(select=[a, b, $f2, $f3, =($e, 0) AS $g_0, =($e, 1) AS $g_1], updateAsRetraction=[true], accMode=[AccRetract])
             +- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $f3=[$3], $e=[0]}, {a=[$0], b=[$1], $f2=[$2], $f3=[null], $e=[1]}], updateAsRetraction=[true], accMode=[AccRetract])
-               +- Calc(select=[a, b, $f2, MOD(HASH_CODE(b), 1024) AS $f3], updateAsRetraction=[true], accMode=[AccRetract])
-                  +- Calc(select=[a, b, 1 AS $f2], updateAsRetraction=[true], accMode=[AccRetract])
-                     +- GroupAggregate(groupBy=[c], select=[c, AVG(a) AS a, AVG(b) AS b], updateAsRetraction=[true], accMode=[AccRetract])
-                        +- Exchange(distribution=[hash[c]], updateAsRetraction=[true], accMode=[Acc])
-                           +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
+               +- Calc(select=[a, b, 1 AS $f2, MOD(HASH_CODE(b), 1024) AS $f3], updateAsRetraction=[true], accMode=[AccRetract])
+                  +- GroupAggregate(groupBy=[c], select=[c, AVG(a) AS a, AVG(b) AS b], updateAsRetraction=[true], accMode=[AccRetract])
+                     +- Exchange(distribution=[hash[c]], updateAsRetraction=[true], accMode=[Acc])
+                        +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
 ]]>
     </Resource>
   </TestCase>
@@ -1266,12 +1249,11 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RET
             +- LocalGroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, COUNT_RETRACT(distinct$0 b) FILTER $g_0 AS count$0, COUNT_RETRACT(*) FILTER $g_1 AS count1$1, COUNT_RETRACT(*) AS count1$2, DISTINCT(b) AS distinct$0], updateAsRetraction=[true], accMode=[Acc])
                +- Calc(select=[a, b, $f2, $f3, =($e, 0) AS $g_0, =($e, 1) AS $g_1], updateAsRetraction=[true], accMode=[AccRetract])
                   +- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $f3=[$3], $e=[0]}, {a=[$0], b=[$1], $f2=[$2], $f3=[null], $e=[1]}], updateAsRetraction=[true], accMode=[AccRetract])
-                     +- Calc(select=[a, b, $f2, MOD(HASH_CODE(b), 1024) AS $f3], updateAsRetraction=[true], accMode=[AccRetract])
-                        +- Calc(select=[a, b, 1 AS $f2], updateAsRetraction=[true], accMode=[AccRetract])
-                           +- GlobalGroupAggregate(groupBy=[c], select=[c, AVG((sum$0, count$1)) AS a, AVG((sum$2, count$3)) AS b], updateAsRetraction=[true], accMode=[AccRetract])
-                              +- Exchange(distribution=[hash[c]], updateAsRetraction=[true], accMode=[Acc])
-                                 +- LocalGroupAggregate(groupBy=[c], select=[c, AVG(a) AS (sum$0, count$1), AVG(b) AS (sum$2, count$3)], updateAsRetraction=[true], accMode=[Acc])
-                                    +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
+                     +- Calc(select=[a, b, 1 AS $f2, MOD(HASH_CODE(b), 1024) AS $f3], updateAsRetraction=[true], accMode=[AccRetract])
+                        +- GlobalGroupAggregate(groupBy=[c], select=[c, AVG((sum$0, count$1)) AS a, AVG((sum$2, count$3)) AS b], updateAsRetraction=[true], accMode=[AccRetract])
+                           +- Exchange(distribution=[hash[c]], updateAsRetraction=[true], accMode=[Acc])
+                              +- LocalGroupAggregate(groupBy=[c], select=[c, AVG(a) AS (sum$0, count$1), AVG(b) AS (sum$2, count$3)], updateAsRetraction=[true], accMode=[Acc])
+                                 +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/GroupingSetsTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/GroupingSetsTest.xml
index 379e14d..0d6d340 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/GroupingSetsTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/GroupingSetsTest.xml
@@ -700,9 +700,9 @@ LogicalProject(c=[$0])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[c])
-+- Sort(orderBy=[$f2 ASC, c ASC])
++- Sort(orderBy=[EXPR$1 ASC, c ASC])
    +- Exchange(distribution=[single])
-      +- Calc(select=[deptno, c, CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT) AS $f2])
+      +- Calc(select=[c, CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT) AS EXPR$1])
          +- GroupAggregate(groupBy=[deptno, $e], select=[deptno, $e, COUNT(*) AS c])
             +- Exchange(distribution=[hash[deptno, $e]])
                +- Expand(projects=[{deptno=[$0], $e=[0]}, {deptno=[null], $e=[1]}])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/IncrementalAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/IncrementalAggregateTest.xml
index 2bb5deb..5b4a016 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/IncrementalAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/IncrementalAggregateTest.xml
@@ -44,9 +44,8 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0(cou
          +- LocalGroupAggregate(groupBy=[a, $f4], partialFinalType=[PARTIAL], select=[a, $f4, COUNT(distinct$0 b) FILTER $g_0 AS count$0, SUM(b) FILTER $g_1 AS sum$1, SUM(b) FILTER $g_10 AS sum$2, DISTINCT(b) AS distinct$0])
             +- Calc(select=[a, b, $f2, $f3, $f4, AND(=($e, 0), $f2) AS $g_0, AND(=($e, 1), $f3) AS $g_1, AND(=($e, 1), $f2) AS $g_10])
                +- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $f3=[$3], $f4=[$4], $e=[0]}, {a=[$0], b=[$1], $f2=[$2], $f3=[$3], $f4=[null], $e=[1]}])
-                  +- Calc(select=[a, b, $f2, $f3, MOD(HASH_CODE(b), 1024) AS $f4])
-                     +- Calc(select=[a, b, IS TRUE(<>(b, 2:BIGINT)) AS $f2, IS TRUE(<>(b, 5:BIGINT)) AS $f3])
-                        +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+                  +- Calc(select=[a, b, IS TRUE(<>(b, 2:BIGINT)) AS $f2, IS TRUE(<>(b, 5:BIGINT)) AS $f3, MOD(HASH_CODE(b), 1024) AS $f4])
+                     +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -79,15 +78,13 @@ GroupAggregate(groupBy=[b], partialFinalType=[FINAL], select=[b, FIRST_VALUE_RET
    +- GroupAggregate(groupBy=[b, $f2], partialFinalType=[PARTIAL], select=[b, $f2, FIRST_VALUE_RETRACT(c) AS $f2_0, LAST_VALUE_RETRACT(c) AS $f3, COUNT_RETRACT(DISTINCT c) AS $f4], updateAsRetraction=[true], accMode=[AccRetract])
       +- Exchange(distribution=[hash[b, $f2]], updateAsRetraction=[true], accMode=[AccRetract])
          +- Calc(select=[b, c, MOD(HASH_CODE(c), 1024) AS $f2], updateAsRetraction=[true], accMode=[AccRetract])
-            +- Calc(select=[b, c], updateAsRetraction=[true], accMode=[AccRetract])
-               +- GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0(count$0) AS $f1, MAX(max$1) AS $f2], updateAsRetraction=[true], accMode=[AccRetract])
-                  +- Exchange(distribution=[hash[a]], updateAsRetraction=[true], accMode=[Acc])
-                     +- IncrementalGroupAggregate(partialAggGrouping=[a, $f2], finalAggGrouping=[a], select=[a, COUNT(distinct$0 count$0) AS count$0, MAX(max$1) AS max$1], updateAsRetraction=[true], accMode=[Acc])
-                        +- Exchange(distribution=[hash[a, $f2]], updateAsRetraction=[true], accMode=[Acc])
-                           +- LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 b) AS count$0, MAX(b) AS max$1, DISTINCT(b) AS distinct$0], updateAsRetraction=[true], accMode=[Acc])
-                              +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2], updateAsRetraction=[true], accMode=[Acc])
-                                 +- Calc(select=[a, b], updateAsRetraction=[true], accMode=[Acc])
-                                    +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
+            +- GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0(count$0) AS $f1, MAX(max$1) AS $f2], updateAsRetraction=[true], accMode=[AccRetract])
+               +- Exchange(distribution=[hash[a]], updateAsRetraction=[true], accMode=[Acc])
+                  +- IncrementalGroupAggregate(partialAggGrouping=[a, $f2], finalAggGrouping=[a], select=[a, COUNT(distinct$0 count$0) AS count$0, MAX(max$1) AS max$1], updateAsRetraction=[true], accMode=[Acc])
+                     +- Exchange(distribution=[hash[a, $f2]], updateAsRetraction=[true], accMode=[Acc])
+                        +- LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 b) AS count$0, MAX(b) AS max$1, DISTINCT(b) AS distinct$0], updateAsRetraction=[true], accMode=[Acc])
+                           +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2], updateAsRetraction=[true], accMode=[Acc])
+                              +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
 ]]>
     </Resource>
   </TestCase>
@@ -150,8 +147,7 @@ GlobalGroupAggregate(partialFinalType=[FINAL], select=[$SUM0(count$0) AS $f0, SU
             +- Calc(select=[a, b, $f2, $f3, =($e, 1) AS $g_1, =($e, 2) AS $g_2])
                +- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $f3=[null], $e=[1]}, {a=[$0], b=[$1], $f2=[null], $f3=[$3], $e=[2]}])
                   +- Calc(select=[a, b, MOD(HASH_CODE(a), 1024) AS $f2, MOD(HASH_CODE(b), 1024) AS $f3])
-                     +- Calc(select=[a, b])
-                        +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+                     +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -237,8 +233,7 @@ GlobalGroupAggregate(partialFinalType=[FINAL], select=[$SUM0(count$0) AS $f0])
       +- Exchange(distribution=[hash[$f1]])
          +- LocalGroupAggregate(groupBy=[$f1], partialFinalType=[PARTIAL], select=[$f1, COUNT(distinct$0 c) AS count$0, DISTINCT(c) AS distinct$0])
             +- Calc(select=[c, MOD(HASH_CODE(c), 1024) AS $f1])
-               +- Calc(select=[c])
-                  +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+               +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -285,18 +280,16 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[SUM($1)], EXP
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, $f1, $f2, CAST($f3) AS $f3])
-+- Calc(select=[a, $f1, $f2, /($f3, $f4) AS $f3])
-   +- GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0(count$0) AS $f1, SUM(sum$1) AS $f2, $SUM0(sum$2) AS $f3, $SUM0(count$3) AS $f4])
-      +- Exchange(distribution=[hash[a]])
-         +- IncrementalGroupAggregate(partialAggGrouping=[a, $f2], finalAggGrouping=[a], select=[a, COUNT(distinct$0 count$0) AS count$0, SUM(sum$1) AS sum$1, $SUM0(sum$2) AS sum$2, COUNT(count$3) AS count$3])
-            +- Exchange(distribution=[hash[a, $f2]])
-               +- LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 b) FILTER $g_0 AS count$0, SUM(b) FILTER $g_1 AS sum$1, $SUM0(b) FILTER $g_1 AS sum$2, COUNT(b) FILTER $g_1 AS count$3, DISTINCT(b) AS distinct$0])
-                  +- Calc(select=[a, b, $f2, =($e, 0) AS $g_0, =($e, 1) AS $g_1])
-                     +- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $e=[0]}, {a=[$0], b=[$1], $f2=[null], $e=[1]}])
-                        +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2])
-                           +- Calc(select=[a, b])
-                              +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+Calc(select=[a, $f1, $f2, CAST(/($f3, $f4)) AS $f3])
++- GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0(count$0) AS $f1, SUM(sum$1) AS $f2, $SUM0(sum$2) AS $f3, $SUM0(count$3) AS $f4])
+   +- Exchange(distribution=[hash[a]])
+      +- IncrementalGroupAggregate(partialAggGrouping=[a, $f2], finalAggGrouping=[a], select=[a, COUNT(distinct$0 count$0) AS count$0, SUM(sum$1) AS sum$1, $SUM0(sum$2) AS sum$2, COUNT(count$3) AS count$3])
+         +- Exchange(distribution=[hash[a, $f2]])
+            +- LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 b) FILTER $g_0 AS count$0, SUM(b) FILTER $g_1 AS sum$1, $SUM0(b) FILTER $g_1 AS sum$2, COUNT(b) FILTER $g_1 AS count$3, DISTINCT(b) AS distinct$0])
+               +- Calc(select=[a, b, $f2, =($e, 0) AS $g_0, =($e, 1) AS $g_1])
+                  +- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $e=[0]}, {a=[$0], b=[$1], $f2=[null], $e=[1]}])
+                     +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2])
+                        +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -319,8 +312,7 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0(cou
       +- Exchange(distribution=[hash[a, $f2]])
          +- LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 c) AS count$0, DISTINCT(c) AS distinct$0])
             +- Calc(select=[a, c, MOD(HASH_CODE(c), 1024) AS $f2])
-               +- Calc(select=[a, c])
-                  +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+               +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -353,12 +345,11 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RET
          +- LocalGroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, COUNT_RETRACT(distinct$0 b) FILTER $g_0 AS count$0, COUNT_RETRACT(*) FILTER $g_1 AS count1$1, COUNT_RETRACT(*) AS count1$2, DISTINCT(b) AS distinct$0], updateAsRetraction=[true], accMode=[Acc])
             +- Calc(select=[a, b, $f2, $f3, =($e, 0) AS $g_0, =($e, 1) AS $g_1], updateAsRetraction=[true], accMode=[AccRetract])
                +- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $f3=[$3], $e=[0]}, {a=[$0], b=[$1], $f2=[$2], $f3=[null], $e=[1]}], updateAsRetraction=[true], accMode=[AccRetract])
-                  +- Calc(select=[a, b, $f2, MOD(HASH_CODE(b), 1024) AS $f3], updateAsRetraction=[true], accMode=[AccRetract])
-                     +- Calc(select=[a, b, 1 AS $f2], updateAsRetraction=[true], accMode=[AccRetract])
-                        +- GlobalGroupAggregate(groupBy=[c], select=[c, AVG((sum$0, count$1)) AS a, AVG((sum$2, count$3)) AS b], updateAsRetraction=[true], accMode=[AccRetract])
-                           +- Exchange(distribution=[hash[c]], updateAsRetraction=[true], accMode=[Acc])
-                              +- LocalGroupAggregate(groupBy=[c], select=[c, AVG(a) AS (sum$0, count$1), AVG(b) AS (sum$2, count$3)], updateAsRetraction=[true], accMode=[Acc])
-                                 +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
+                  +- Calc(select=[a, b, 1 AS $f2, MOD(HASH_CODE(b), 1024) AS $f3], updateAsRetraction=[true], accMode=[AccRetract])
+                     +- GlobalGroupAggregate(groupBy=[c], select=[c, AVG((sum$0, count$1)) AS a, AVG((sum$2, count$3)) AS b], updateAsRetraction=[true], accMode=[AccRetract])
+                        +- Exchange(distribution=[hash[c]], updateAsRetraction=[true], accMode=[Acc])
+                           +- LocalGroupAggregate(groupBy=[c], select=[c, AVG(a) AS (sum$0, count$1), AVG(b) AS (sum$2, count$3)], updateAsRetraction=[true], accMode=[Acc])
+                              +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/ConvertToNotInOrInRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/ConvertToNotInOrInRuleTest.scala
new file mode 100644
index 0000000..858e793
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/ConvertToNotInOrInRuleTest.scala
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.calcite.CalciteConfig
+import org.apache.flink.table.plan.optimize.program._
+import org.apache.flink.table.util.TableTestBase
+
+import org.apache.calcite.plan.hep.HepMatchOrder
+import org.apache.calcite.tools.RuleSets
+import org.junit.{Before, Test}
+
+/**
+  * Test for [[ConvertToNotInOrInRule]].
+  */
+class ConvertToNotInOrInRuleTest extends TableTestBase {
+  private val util = batchTestUtil()
+
+  @Before
+  def setup(): Unit = {
+    val programs = new FlinkChainedProgram[BatchOptimizeContext]()
+    programs.addLast(
+      "rules",
+      FlinkHepRuleSetProgramBuilder.newBuilder
+        .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
+        .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+        .add(RuleSets.ofList(ConvertToNotInOrInRule.INSTANCE))
+        .build())
+    val calciteConfig = CalciteConfig.createBuilder(util.tableEnv.getConfig.getCalciteConfig)
+      .replaceBatchProgram(programs).build()
+    util.tableEnv.getConfig.setCalciteConfig(calciteConfig)
+
+    util.addTableSource[(Int, Long, Float, Double, String)]("MyTable", 'a, 'b, 'c, 'd, 'e)
+  }
+
+  @Test
+  def testConvertToIn_LessThanThreshold_Int(): Unit = {
+    util.verifyPlan("SELECT * FROM MyTable WHERE a = 1 OR a = 2 OR a = 3")
+  }
+
+  @Test
+  def testConvertToIn_EqualsToThreshold_Int(): Unit = {
+    util.verifyPlan("SELECT * FROM MyTable WHERE a = 1 OR a = 2 OR a = 3 OR a = 4")
+  }
+
+  @Test
+  def testConvertToIn_GreaterThanThreshold_Int(): Unit = {
+    util.verifyPlan("SELECT * FROM MyTable WHERE a = 1 OR a = 2 OR a = 3 OR a = 4 OR a = 5")
+  }
+
+  @Test
+  def testConvertToIn_LessThanThreshold_Double(): Unit = {
+    val where = (1 until 20).map(i => s"d = $i").mkString(" OR ")
+    util.verifyPlan(s"SELECT * FROM MyTable WHERE $where")
+  }
+
+  @Test
+  def testConvertToIn_GreaterThanThreshold_Double(): Unit = {
+    val where = (1 until 21).map(i => s"d = $i").mkString(" OR ")
+    util.verifyPlan(s"SELECT * FROM MyTable WHERE $where")
+  }
+
+  @Test
+  def testConvertToIn_WithOr1(): Unit = {
+    util.verifyPlan("SELECT * FROM MyTable WHERE a = 1 OR a = 2 OR a = 3 OR a = 4 OR b = 1")
+  }
+
+  @Test
+  def testConvertToIn_WithOr2(): Unit = {
+    util.verifyPlan("SELECT * FROM MyTable WHERE a = 1 OR a = 2 OR b = 1 OR a = 3 OR a = 4")
+  }
+
+  @Test
+  def testConvertToIn_WithAnd1(): Unit = {
+    util.verifyPlan("SELECT * FROM MyTable WHERE (a = 1 OR a = 2 OR a = 3 OR a = 4) AND b = 1")
+  }
+
+  @Test
+  def testConvertToIn_WithAnd2(): Unit = {
+    util.verifyPlan("SELECT * FROM MyTable WHERE a = 1 OR a = 2 OR a = 3 OR a = 4 AND b = 1")
+  }
+
+  @Test
+  def testConvertToNotIn_LessThanThreshold_Int(): Unit = {
+    util.verifyPlan("SELECT * FROM MyTable WHERE a <> 1 AND a <> 2 AND a <> 3")
+  }
+
+  @Test
+  def testConvertToNotIn_EqualsToThreshold_Int(): Unit = {
+    util.verifyPlan("SELECT * FROM MyTable WHERE a <> 1 AND a <> 2 AND a <> 3 AND a <> 4")
+  }
+
+  @Test
+  def testConvertToNotIn_GreaterThanThreshold_Int(): Unit = {
+    util.verifyPlan("SELECT * FROM MyTable WHERE a <> 1 AND a <> 2 AND a <> 3 AND a <> 4 AND a = 5")
+  }
+
+  @Test
+  def testConvertToNotIn_LessThanThreshold_Double(): Unit = {
+    val where = (1 until 20).map(i => s"d <> $i").mkString(" AND ")
+    util.verifyPlan(s"SELECT * FROM MyTable WHERE $where")
+  }
+
+  @Test
+  def testConvertToNotIn_GreaterThanThreshold_Double(): Unit = {
+    val where = (1 until 21).map(i => s"d <> $i").mkString(" AND ")
+    util.verifyPlan(s"SELECT * FROM MyTable WHERE $where")
+  }
+
+  @Test
+  def testConvertToNotIn_WithOr1(): Unit = {
+    util.verifyPlan("SELECT * FROM MyTable WHERE (a <> 1 AND a <> 2 AND a <> 3 AND a <> 4) OR b =" +
+      " 1")
+  }
+
+  @Test
+  def testConvertToNotIn_WithOr2(): Unit = {
+    util.verifyPlan("SELECT * FROM MyTable WHERE a <> 1 AND a <> 2 AND a <> 3 AND a <> 4 OR b = 1")
+  }
+
+  @Test
+  def testConvertToNotIn_WithOr3(): Unit = {
+    util.verifyPlan("SELECT * FROM MyTable WHERE a <> 1 OR a <> 2 OR a <> 3 OR a <> 4 OR b = 1")
+  }
+
+  @Test
+  def testConvertToNotIn_WithAnd1(): Unit = {
+    util.verifyPlan("SELECT * FROM MyTable WHERE a <> 1 AND a <> 2 AND a <> 3 AND a <> 4 AND b = 1")
+  }
+
+  @Test
+  def testConvertToNotIn_WithAnd2(): Unit = {
+    val sqlQuery = "SELECT * FROM MyTable WHERE a <> 1 AND a <> 2  AND b = 1 AND a <> 3 AND a <> 4"
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testConvertToInAndNotIn1(): Unit = {
+    util.verifyPlan("SELECT * FROM MyTable WHERE a = 1 OR a = 2 OR a = 3 OR a = 4 OR b = 1 " +
+      "OR (a <> 1 AND a <> 2 AND a <> 3 AND a <> 4)")
+  }
+
+  @Test
+  def testConvertToInAndNotIn2(): Unit = {
+    util.verifyPlan("SELECT * FROM MyTable WHERE b = 1 OR a = 1 OR a = 2 OR a = 3 OR a = 4  " +
+      "AND (a <> 1 AND a <> 2 AND a <> 3 AND a <> 4)")
+  }
+
+  @Test
+  def testConvertToInAndNotIn3(): Unit = {
+    util.verifyPlan("SELECT * FROM MyTable WHERE b = 1 OR b = 2 OR (a <> 1 AND a <> 2 AND a <> 3 " +
+      "AND a <> 4 AND c = 1) OR b = 3 OR b = 4 OR c = 1")
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkCalcMergeRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkCalcMergeRuleTest.scala
new file mode 100644
index 0000000..15dd6ef
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkCalcMergeRuleTest.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.calcite.CalciteConfig
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalTableSourceScan}
+import org.apache.flink.table.plan.optimize.program._
+import org.apache.flink.table.plan.rules.FlinkBatchRuleSets
+import org.apache.flink.table.runtime.utils.JavaUserDefinedScalarFunctions.NonDeterministicUdf
+import org.apache.flink.table.util.TableTestBase
+
+import org.apache.calcite.plan.hep.HepMatchOrder
+import org.apache.calcite.rel.rules._
+import org.apache.calcite.tools.RuleSets
+import org.junit.{Before, Test}
+
+/**
+  * Test for [[FlinkCalcMergeRule]].
+  */
+class FlinkCalcMergeRuleTest extends TableTestBase {
+  private val util = batchTestUtil()
+
+  @Before
+  def setup(): Unit = {
+    val programs = new FlinkChainedProgram[BatchOptimizeContext]()
+    programs.addLast(
+      "table_ref",
+      FlinkHepRuleSetProgramBuilder.newBuilder
+        .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
+        .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+        .add(FlinkBatchRuleSets.TABLE_REF_RULES)
+        .build())
+    programs.addLast(
+      "logical",
+      FlinkVolcanoProgramBuilder.newBuilder
+        .add(RuleSets.ofList(
+          FilterToCalcRule.INSTANCE,
+          ProjectToCalcRule.INSTANCE,
+          FlinkCalcMergeRule.INSTANCE,
+          FlinkLogicalCalc.CONVERTER,
+          FlinkLogicalTableSourceScan.CONVERTER
+        ))
+        .setRequiredOutputTraits(Array(FlinkConventions.LOGICAL))
+        .build())
+    val calciteConfig = CalciteConfig.createBuilder(util.tableEnv.getConfig.getCalciteConfig)
+      .replaceBatchProgram(programs).build()
+    util.tableEnv.getConfig.setCalciteConfig(calciteConfig)
+
+    util.addTableSource[(Int, Int, String)]("MyTable", 'a, 'b, 'c)
+  }
+
+  @Test
+  def testCalcMergeWithSameDigest(): Unit = {
+    util.verifyPlan("SELECT a, b FROM (SELECT * FROM MyTable WHERE a = b) t WHERE b = a")
+  }
+
+  @Test
+  def testCalcMergeWithNonDeterministicExpr1(): Unit = {
+    util.addFunction("random_udf", new NonDeterministicUdf)
+    val sqlQuery = "SELECT a, a1 FROM (SELECT a, random_udf(a) AS a1 FROM MyTable) t WHERE a1 > 10"
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testCalcMergeWithNonDeterministicExpr2(): Unit = {
+    util.addFunction("random_udf", new NonDeterministicUdf)
+    val sqlQuery = "SELECT a FROM (SELECT a FROM MyTable) t WHERE random_udf(a) > 10"
+    util.verifyPlan(sqlQuery)
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/RewriteCoalesceRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/RewriteCoalesceRuleTest.scala
new file mode 100644
index 0000000..7d7744b
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/RewriteCoalesceRuleTest.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.calcite.CalciteConfig
+import org.apache.flink.table.codegen.CodeGenException
+import org.apache.flink.table.plan.optimize.program.{BatchOptimizeContext, FlinkChainedProgram, FlinkHepRuleSetProgramBuilder, HEP_RULES_EXECUTION_TYPE}
+import org.apache.flink.table.plan.rules.FlinkBatchRuleSets
+import org.apache.flink.table.util.TableTestBase
+
+import org.apache.calcite.plan.hep.HepMatchOrder
+import org.junit.{Before, Test}
+
+import java.sql.Date
+
+/**
+  * Test for [[RewriteCoalesceRule]].
+  */
+class RewriteCoalesceRuleTest extends TableTestBase {
+  private val util = batchTestUtil()
+
+  @Before
+  def setup(): Unit = {
+    val programs = new FlinkChainedProgram[BatchOptimizeContext]()
+    programs.addLast(
+      "default_rewrite",
+      FlinkHepRuleSetProgramBuilder.newBuilder
+        .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
+        .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+        .add(FlinkBatchRuleSets.DEFAULT_REWRITE_RULES)
+        .build())
+
+    val calciteConfig = CalciteConfig.createBuilder(util.tableEnv.getConfig.getCalciteConfig)
+      .replaceBatchProgram(programs).build()
+    util.tableEnv.getConfig.setCalciteConfig(calciteConfig)
+
+    util.addTableSource[(Int, String, String, Int, Date, Double, Double, Int)](
+      "scott_emp", 'empno, 'ename, 'job, 'mgr, 'hiredate, 'sal, 'comm, 'deptno)
+    util.addTableSource[(Int, String, String)]("scott_dept", 'deptno, 'dname, 'loc)
+  }
+
+  @Test
+  def testCalcite1018(): Unit = {
+    val sqlQuery =
+      """
+        |select * from (select * from scott_emp) e left join (
+        |    select * from scott_dept d) using (deptno)
+        |    order by empno limit 10
+      """.stripMargin
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testCoalesceConstantReduce(): Unit = {
+    val sqlQuery =
+      """
+        |select * from lateral (select * from scott_emp) as e
+        |    join (table scott_dept) using (deptno)
+        |    where e.deptno = 10
+      """.stripMargin
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test(expected = classOf[CodeGenException])
+  // TODO remove expected exception after [FLINK-12371] merged
+  def testCalcite864_1(): Unit = {
+    val sqlQuery =
+      """
+        |select *
+        |    from scott_emp as e
+        |    join scott_dept as d using (deptno)
+        |    where sal = (
+        |      select max(sal)
+        |      from scott_emp as e2
+        |      join scott_dept as d2 using (deptno)
+        |      where d2.deptno = d.deptno)
+      """.stripMargin
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test(expected = classOf[CodeGenException])
+  // TODO remove expected exception after [FLINK-12371] merged
+  def testCalcite864_3(): Unit = {
+    val sqlQuery =
+      """
+        |select *
+        |    from scott_emp as e
+        |    join scott_dept as d using (deptno)
+        |    where d.dname = (
+        |      select max(dname)
+        |      from scott_dept as d2
+        |      where d2.deptno = d.deptno)
+      """.stripMargin
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testNaturalJoinWithPredicates(): Unit = {
+    val sqlQuery =
+      """
+        |select * from scott_dept natural join scott_emp where empno = 1
+      """.stripMargin
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testNaturalJoinLeftOuter(): Unit = {
+    val sqlQuery =
+      """
+        |SELECT * FROM scott_dept
+        |    natural left join scott_emp
+        |    order by scott_dept.deptno, scott_emp.deptno
+      """.stripMargin
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testNaturalJoinRightOuter(): Unit = {
+    val sqlQuery =
+      """
+        |SELECT * FROM scott_dept
+        |    natural right join scott_emp
+        |    order by scott_dept.deptno, scott_emp.deptno
+      """.stripMargin
+    util.verifyPlan(sqlQuery)
+  }
+}