You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/05/13 01:24:47 UTC
[flink] branch master updated: [FLINK-12487] [table-planner-blink]
Introduce planner rules to rewrite expression and merge calc
This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b47b591 [FLINK-12487] [table-planner-blink] Introduce planner rules to rewrite expression and merge calc
b47b591 is described below
commit b47b5910c16a423504171a2a29230ff1d8966214
Author: godfrey he <go...@163.com>
AuthorDate: Mon May 13 09:24:30 2019 +0800
[FLINK-12487] [table-planner-blink] Introduce planner rules to rewrite expression and merge calc
This closes #8411
---
.../flink/table/codegen/CodeGeneratorContext.scala | 2 +-
.../table/plan/rules/FlinkBatchRuleSets.scala | 23 +-
.../table/plan/rules/FlinkStreamRuleSets.scala | 24 +-
.../rules/logical/ConvertToNotInOrInRule.scala | 187 +++++++++
.../plan/rules/logical/FlinkCalcMergeRule.scala | 108 +++++
.../plan/rules/logical/RewriteCoalesceRule.scala | 237 +++++++++++
.../apache/flink/table/plan/batch/sql/CalcTest.xml | 4 +-
.../table/plan/batch/sql/agg/GroupingSetsTest.xml | 6 +-
.../rules/logical/ConvertToNotInOrInRuleTest.xml | 437 +++++++++++++++++++++
...nkAggregateExpandDistinctAggregatesRuleTest.xml | 8 +-
.../plan/rules/logical/FlinkCalcMergeRuleTest.xml | 80 ++++
.../plan/rules/logical/RewriteCoalesceRuleTest.xml | 164 ++++++++
.../plan/rules/logical/SplitAggregateRuleTest.xml | 36 +-
.../flink/table/plan/stream/sql/CalcTest.xml | 4 +-
.../plan/stream/sql/agg/DistinctAggregateTest.xml | 124 +++---
.../table/plan/stream/sql/agg/GroupingSetsTest.xml | 4 +-
.../stream/sql/agg/IncrementalAggregateTest.xml | 63 ++-
.../rules/logical/ConvertToNotInOrInRuleTest.scala | 170 ++++++++
.../rules/logical/FlinkCalcMergeRuleTest.scala | 87 ++++
.../rules/logical/RewriteCoalesceRuleTest.scala | 144 +++++++
20 files changed, 1760 insertions(+), 152 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGeneratorContext.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGeneratorContext.scala
index 83d29df..59cb2b3 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGeneratorContext.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGeneratorContext.scala
@@ -398,7 +398,7 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
}
addReusableMember(
- s"final $setTypeTerm $fieldTerm = new $setTypeTerm(${elements.size})")
+ s"final $setTypeTerm $fieldTerm = new $setTypeTerm(${elements.size});")
elements.foreach { element =>
val content =
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
index e87b77e..22f13cc 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
@@ -66,10 +66,22 @@ object FlinkBatchRuleSets {
)
/**
+ * RuleSet to rewrite coalesce to case when
+ */
+ private val REWRITE_COALESCE_RULES: RuleSet = RuleSets.ofList(
+ // rewrite coalesce to case when
+ RewriteCoalesceRule.FILTER_INSTANCE,
+ RewriteCoalesceRule.PROJECT_INSTANCE,
+ RewriteCoalesceRule.JOIN_INSTANCE,
+ RewriteCoalesceRule.CALC_INSTANCE
+ )
+
+ /**
* RuleSet to normalize plans for batch
*/
val DEFAULT_REWRITE_RULES: RuleSet = RuleSets.ofList((
- REDUCE_EXPRESSION_RULES.asScala ++
+ REWRITE_COALESCE_RULES.asScala ++
+ REDUCE_EXPRESSION_RULES.asScala ++
List(
// Transform window to LogicalWindowAggregate
BatchLogicalWindowAggregateRule.INSTANCE,
@@ -80,7 +92,8 @@ object FlinkBatchRuleSets {
//ensure intersect set operator have the same row type
new CoerceInputsRule(classOf[LogicalIntersect], false),
//ensure except set operator have the same row type
- new CoerceInputsRule(classOf[LogicalMinus], false)
+ new CoerceInputsRule(classOf[LogicalMinus], false),
+ ConvertToNotInOrInRule.INSTANCE
)).asJava)
/**
@@ -139,7 +152,9 @@ object FlinkBatchRuleSets {
// reorder sort and projection
ProjectSortTransposeRule.INSTANCE,
//removes constant keys from an Agg
- AggregateProjectPullUpConstantsRule.INSTANCE
+ AggregateProjectPullUpConstantsRule.INSTANCE,
+ // push project through a Union
+ ProjectSetOpTransposeRule.INSTANCE
)
val WINDOW_RULES: RuleSet = RuleSets.ofList(
@@ -197,7 +212,7 @@ object FlinkBatchRuleSets {
ProjectCalcMergeRule.INSTANCE,
FilterToCalcRule.INSTANCE,
ProjectToCalcRule.INSTANCE,
- CalcMergeRule.INSTANCE
+ FlinkCalcMergeRule.INSTANCE
)
/**
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
index 6ac853d..d6cdba8 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
@@ -66,10 +66,22 @@ object FlinkStreamRuleSets {
)
/**
+ * RuleSet to rewrite coalesce to case when
+ */
+ private val REWRITE_COALESCE_RULES: RuleSet = RuleSets.ofList(
+ // rewrite coalesce to case when
+ RewriteCoalesceRule.FILTER_INSTANCE,
+ RewriteCoalesceRule.PROJECT_INSTANCE,
+ RewriteCoalesceRule.JOIN_INSTANCE,
+ RewriteCoalesceRule.CALC_INSTANCE
+ )
+
+ /**
* RuleSet to normalize plans for stream
*/
val DEFAULT_REWRITE_RULES: RuleSet = RuleSets.ofList((
- REDUCE_EXPRESSION_RULES.asScala ++
+ REWRITE_COALESCE_RULES.asScala ++
+ REDUCE_EXPRESSION_RULES.asScala ++
List(
StreamLogicalWindowAggregateRule.INSTANCE,
// slices a project into sections which contain window agg functions
@@ -82,7 +94,8 @@ object FlinkStreamRuleSets {
//ensure intersect set operator have the same row type
new CoerceInputsRule(classOf[LogicalIntersect], false),
//ensure except set operator have the same row type
- new CoerceInputsRule(classOf[LogicalMinus], false)
+ new CoerceInputsRule(classOf[LogicalMinus], false),
+ ConvertToNotInOrInRule.INSTANCE
)
).asJava)
@@ -187,7 +200,7 @@ object FlinkStreamRuleSets {
ProjectCalcMergeRule.INSTANCE,
FilterToCalcRule.INSTANCE,
ProjectToCalcRule.INSTANCE,
- CalcMergeRule.INSTANCE
+ FlinkCalcMergeRule.INSTANCE
)
/**
@@ -230,8 +243,9 @@ object FlinkStreamRuleSets {
// transform over window to topn node
FlinkLogicalRankRule.INSTANCE,
// split distinct aggregate to reduce data skew
- SplitAggregateRule.INSTANCE
- // TODO add flink calc merge rule
+ SplitAggregateRule.INSTANCE,
+ // merge calc after calc transpose
+ FlinkCalcMergeRule.INSTANCE
)
/**
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/ConvertToNotInOrInRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/ConvertToNotInOrInRule.scala
new file mode 100644
index 0000000..4bdc3fc
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/ConvertToNotInOrInRule.scala
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.table.`type`.InternalTypes
+import org.apache.flink.table.calcite.FlinkTypeFactory
+
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptUtil}
+import org.apache.calcite.rel.core.Filter
+import org.apache.calcite.rex.{RexCall, RexLiteral, RexNode}
+import org.apache.calcite.sql.SqlBinaryOperator
+import org.apache.calcite.sql.fun.SqlStdOperatorTable.{AND, EQUALS, IN, NOT_EQUALS, NOT_IN, OR}
+import org.apache.calcite.tools.RelBuilder
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+
+/**
+ * Rule for converting a cascade of predicates to [[IN]] or [[NOT_IN]].
+ *
+ * For example,
+ * 1. convert predicate: (x = 1 OR x = 2 OR x = 3 OR x = 4) AND y = 5
+ * to predicate: x IN (1, 2, 3, 4) AND y = 5.
+ * 2. convert predicate: (x <> 1 AND x <> 2 AND x <> 3 AND x <> 4) AND y = 5
+ * to predicate: x NOT IN (1, 2, 3, 4) AND y = 5.
+ */
+class ConvertToNotInOrInRule
+ extends RelOptRule(
+ operand(classOf[Filter], any),
+ "ConvertToNotInOrInRule") {
+
+ // these threshold values are set by OptimizableHashSet benchmark test on different type.
+ // threshold for non-float and non-double type
+ private val THRESHOLD: Int = 4
+ // threshold for float and double type
+ private val FRACTIONAL_THRESHOLD: Int = 20
+
+ override def onMatch(call: RelOptRuleCall): Unit = {
+ val filter: Filter = call.rel(0)
+ val condition = filter.getCondition
+
+ // convert equal expression connected by OR to IN
+ val inExpr = convertToNotInOrIn(call.builder(), condition, IN)
+ // convert not-equal expression connected by AND to NOT_IN
+ val notInExpr = convertToNotInOrIn(call.builder(), inExpr.getOrElse(condition), NOT_IN)
+
+ notInExpr match {
+ case Some(expr) =>
+ val newFilter = filter.copy(filter.getTraitSet, filter.getInput, expr)
+ call.transformTo(newFilter)
+ case _ =>
+ // check IN conversion if NOT_IN conversion is fail
+ inExpr match {
+ case Some(expr) =>
+ val newFilter = filter.copy(filter.getTraitSet, filter.getInput, expr)
+ call.transformTo(newFilter)
+ case _ => // do nothing
+ }
+ }
+ }
+
+ /**
+ * Returns a condition decomposed by [[AND]] or [[OR]].
+ */
+ private def decomposedBy(rex: RexNode, operator: SqlBinaryOperator): Seq[RexNode] = {
+ operator match {
+ case AND => RelOptUtil.conjunctions(rex)
+ case OR => RelOptUtil.disjunctions(rex)
+ }
+ }
+
+ /**
+ * Convert a cascade predicates to [[IN]] or [[NOT_IN]].
+ *
+ * @param builder The [[RelBuilder]] to build the [[RexNode]].
+ * @param rex The predicates to be converted.
+ * @return The converted predicates.
+ */
+ private def convertToNotInOrIn(
+ builder: RelBuilder,
+ rex: RexNode,
+ toOperator: SqlBinaryOperator): Option[RexNode] = {
+
+ // For example, when convert to [[IN]], fromOperator is [[EQUALS]].
+ // We convert a cascade of [[EQUALS]] to [[IN]].
+ // A connect operator is used to connect the fromOperator.
+ // A composed operator may contains sub [[IN]] or [[NOT_IN]].
+ val (fromOperator, connectOperator, composedOperator) = toOperator match {
+ case IN => (EQUALS, OR, AND)
+ case NOT_IN => (NOT_EQUALS, AND, OR)
+ }
+
+ val decomposed = decomposedBy(rex, connectOperator)
+ val combineMap = new mutable.HashMap[String, mutable.ListBuffer[RexCall]]
+ val rexBuffer = new mutable.ArrayBuffer[RexNode]
+ var beenConverted = false
+
+ // traverse decomposed predicates
+ decomposed.foreach {
+ case call: RexCall =>
+ call.getOperator match {
+ // put same predicates into combine map
+ case `fromOperator` =>
+ (call.operands(0), call.operands(1)) match {
+ case (ref, _: RexLiteral) =>
+ combineMap.getOrElseUpdate(ref.toString, mutable.ListBuffer[RexCall]()) += call
+ case (l: RexLiteral, ref) =>
+ combineMap.getOrElseUpdate(ref.toString, mutable.ListBuffer[RexCall]()) +=
+ call.clone(call.getType, List(ref, l))
+ case _ => rexBuffer += call
+ }
+
+ // process sub predicates
+ case `composedOperator` =>
+ val newRex = decomposedBy(call, composedOperator).map { r =>
+ convertToNotInOrIn(builder, r, toOperator) match {
+ case Some(ex) =>
+ beenConverted = true
+ ex
+ case None => r
+ }
+ }
+ composedOperator match {
+ case AND => rexBuffer += builder.and(newRex)
+ case OR => rexBuffer += builder.or(newRex)
+ }
+
+ case _ => rexBuffer += call
+ }
+
+ case rex => rexBuffer += rex
+ }
+
+ combineMap.values.foreach { list =>
+ if (needConvert(list.toList)) {
+ val inputRef = list.head.getOperands.head
+ val values = list.map(_.getOperands.last)
+ rexBuffer += builder.getRexBuilder.makeCall(toOperator, List(inputRef) ++ values)
+ beenConverted = true
+ } else {
+ connectOperator match {
+ case AND => rexBuffer += builder.and(list)
+ case OR => rexBuffer += builder.or(list)
+ }
+ }
+ }
+
+ if (beenConverted) {
+ // return result if has been converted
+ connectOperator match {
+ case AND => Some(builder.and(rexBuffer))
+ case OR => Some(builder.or(rexBuffer))
+ }
+ } else {
+ None
+ }
+ }
+
+ private def needConvert(rexNodes: List[RexCall]): Boolean = {
+ val inputRef = rexNodes.head.getOperands.head
+ FlinkTypeFactory.toInternalType(inputRef.getType) match {
+ case InternalTypes.FLOAT | InternalTypes.DOUBLE => rexNodes.size >= FRACTIONAL_THRESHOLD
+ case _ => rexNodes.size >= THRESHOLD
+ }
+ }
+}
+
+object ConvertToNotInOrInRule {
+ val INSTANCE = new ConvertToNotInOrInRule
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/FlinkCalcMergeRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/FlinkCalcMergeRule.scala
new file mode 100644
index 0000000..8deddfb
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/FlinkCalcMergeRule.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.table.plan.util.FlinkRexUtil
+
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.core.{Calc, RelFactories}
+import org.apache.calcite.rex.{RexOver, RexProgramBuilder, RexUtil}
+import org.apache.calcite.tools.RelBuilderFactory
+
+import scala.collection.JavaConversions._
+
+/**
+ * This rules is copied from Calcite's [[org.apache.calcite.rel.rules.CalcMergeRule]].
+ *
+ * Modification:
+ * - Condition in the merged program will be simplified if it exists.
+ * - Don't merge calcs which contain non-deterministic expr
+ */
+
+/**
+ * Planner rule that merges a [[Calc]] onto a [[Calc]].
+ *
+ * <p>The resulting [[Calc]] has the same project list as the upper [[Calc]],
+ * but expressed in terms of the lower [[Calc]]'s inputs.
+ */
+class FlinkCalcMergeRule(relBuilderFactory: RelBuilderFactory) extends RelOptRule(
+ operand(classOf[Calc],
+ operand(classOf[Calc], any)),
+ relBuilderFactory,
+ "FlinkCalcMergeRule") {
+
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val topCalc: Calc = call.rel(0)
+ val bottomCalc: Calc = call.rel(1)
+
+ // Don't merge a calc which contains windowed aggregates onto a
+ // calc. That would effectively be pushing a windowed aggregate down
+ // through a filter.
+ val topProgram = topCalc.getProgram
+ if (RexOver.containsOver(topProgram)) {
+ return false
+ }
+
+ // Don't merge Calcs which contain non-deterministic expr
+ topProgram.getExprList.forall(RexUtil.isDeterministic) &&
+ bottomCalc.getProgram.getExprList.forall(RexUtil.isDeterministic)
+ }
+
+ override def onMatch(call: RelOptRuleCall): Unit = {
+ val topCalc: Calc = call.rel(0)
+ val bottomCalc: Calc = call.rel(1)
+
+ val topProgram = topCalc.getProgram
+ val rexBuilder = topCalc.getCluster.getRexBuilder
+ // Merge the programs together.
+ val mergedProgram = RexProgramBuilder.mergePrograms(
+ topCalc.getProgram, bottomCalc.getProgram, rexBuilder)
+ require(mergedProgram.getOutputRowType eq topProgram.getOutputRowType)
+
+ val newMergedProgram = if (mergedProgram.getCondition != null) {
+ val condition = mergedProgram.expandLocalRef(mergedProgram.getCondition)
+ val simplifiedCondition = FlinkRexUtil.simplify(rexBuilder, condition)
+ if (simplifiedCondition.toString == condition.toString) {
+ mergedProgram
+ } else {
+ val programBuilder = RexProgramBuilder.forProgram(mergedProgram, rexBuilder, true)
+ programBuilder.clearCondition()
+ programBuilder.addCondition(simplifiedCondition)
+ programBuilder.getProgram(true)
+ }
+ } else {
+ mergedProgram
+ }
+
+ val newCalc = topCalc.copy(topCalc.getTraitSet, bottomCalc.getInput, newMergedProgram)
+ if (newCalc.getDigest == bottomCalc.getDigest) {
+ // newCalc is equivalent to bottomCalc,
+ // which means that topCalc
+ // must be trivial. Take it out of the game.
+ call.getPlanner.setImportance(topCalc, 0.0)
+ }
+ call.transformTo(newCalc)
+ }
+
+}
+
+object FlinkCalcMergeRule {
+ val INSTANCE = new FlinkCalcMergeRule(RelFactories.LOGICAL_BUILDER)
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RewriteCoalesceRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RewriteCoalesceRule.scala
new file mode 100644
index 0000000..bedfa53
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RewriteCoalesceRule.scala
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.{Calc, Filter, Join, Project}
+import org.apache.calcite.rel.logical.{LogicalCalc, LogicalFilter, LogicalJoin, LogicalProject}
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.SqlKind
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
+import org.apache.calcite.util.Util
+
+import scala.collection.JavaConversions._
+
+/**
+ * Collection of planner rules that transform `Coalesce` to `Case When` RexNode trees.
+ *
+ * <p>Currently this is only used for natural join, for explicit Coalesce
+ * Calcite already replace it with Case When.
+ *
+ * <p>There are four transformation contexts:
+ * <ul>
+ * <li>Project project list
+ * <li>Join condition
+ * <li>Filter condition
+ * <li>Calc expression list
+ * </ul>
+ */
+abstract class RewriteCoalesceRule[T <: RelNode](
+ clazz: Class[T],
+ description: String)
+ extends RelOptRule(
+ operand(clazz, any),
+ description) {
+
+ private class CoalesceToCaseShuttle(rexBuilder: RexBuilder) extends RexShuttle {
+ override def visitCall(call: RexCall): RexNode = {
+ call.getKind match {
+ case SqlKind.COALESCE =>
+ val operands = call.getOperands
+ if (operands.size == 1) {
+ operands.head
+ } else {
+ val operandsExceptLast = Util.skipLast(call.getOperands)
+ val args: ImmutableList.Builder[RexNode] = ImmutableList.builder()
+ operandsExceptLast.foreach { operand =>
+ args.add(rexBuilder.makeCall(SqlStdOperatorTable.IS_NOT_NULL, operand), operand)
+ }
+ args.add(call.getOperands.last)
+ rexBuilder.makeCall(SqlStdOperatorTable.CASE, args.build)
+ }
+ case _ => super.visitCall(call)
+ }
+ }
+ }
+
+ protected def replace(input: RexNode,
+ rexBuilder: RexBuilder): RexNode = {
+ val shuttle = new CoalesceToCaseShuttle(rexBuilder)
+ input.accept(shuttle)
+ }
+
+ protected def existsCoalesce(rexNode: RexNode): Boolean = {
+ class CoalesceFinder extends RexVisitorImpl[Unit](true) {
+ var found = false
+
+ override def visitCall(call: RexCall): Unit = {
+ call.getKind match {
+ case SqlKind.COALESCE => found = true
+ case _ => super.visitCall(call)
+ }
+ }
+
+ def isFound: Boolean = found
+ }
+ val finder = new CoalesceFinder
+ rexNode.accept(finder)
+ finder.isFound
+ }
+
+}
+
+/**
+ * Planner rule that rewrites `Coalesce` in filter condition to `Case When`.
+ */
+class FilterRewriteCoalesceRule extends
+ RewriteCoalesceRule(
+ classOf[LogicalFilter],
+ "FilterRewriteCoalesceRule") {
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val filter: Filter = call.rel(0)
+ existsCoalesce(filter.getCondition)
+ }
+
+ override def onMatch(call: RelOptRuleCall): Unit = {
+ val filter: Filter = call.rel(0)
+ val relBuilder: RelBuilder = call.builder()
+ val rexBuilder: RexBuilder = relBuilder.getRexBuilder
+ // transform the filter recursively, may change all the Coalesce in the filter
+ // to Case when, this is ok for us now, cause the filter will never be matched again
+ // by this rule after the transformation.
+ val newCondition = replace(filter.getCondition, rexBuilder)
+ val newFilter = relBuilder
+ .push(filter.getInput)
+ .filter(newCondition)
+ .build()
+ call.transformTo(newFilter)
+ }
+}
+
+/**
+ * Planner rule that rewrites `Coalesce` in project list to `Case When`.
+ */
+class ProjectRewriteCoalesceRule extends
+ RewriteCoalesceRule(
+ classOf[LogicalProject],
+ "ProjectRewriteCoalesceRule") {
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val prj: Project = call.rel(0)
+ prj.getProjects.exists(p => existsCoalesce(p))
+ }
+
+ override def onMatch(call: RelOptRuleCall): Unit = {
+ val prj: Project = call.rel(0)
+ val relBuilder: RelBuilder = call.builder()
+ val rexBuilder: RexBuilder = relBuilder.getRexBuilder
+ // transform the project recursively, may change all the Coalesce in the project
+ // to Case when, this is ok for us now, cause the project will never be matched again
+ // by this rule after the transformation.
+ val newProjects = prj.getProjects.map(p => replace(p, rexBuilder))
+ val newProject = relBuilder
+ .push(prj.getInput)
+ .project(newProjects)
+ .build()
+ call.transformTo(newProject)
+ }
+}
+
+/**
+ * Planner rule that rewrites `Coalesce` in join condition to `Case When`.
+ */
+class JoinRewriteCoalesceRule extends
+ RewriteCoalesceRule(
+ classOf[LogicalJoin],
+ "JoinRewriteCoalesceRule") {
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val prj: Join = call.rel(0)
+ existsCoalesce(prj.getCondition)
+ }
+
+ override def onMatch(call: RelOptRuleCall): Unit = {
+ val join: Join = call.rel(0)
+ val relBuilder: RelBuilder = call.builder()
+ val rexBuilder: RexBuilder = relBuilder.getRexBuilder
+ val newCondition = replace(join.getCondition, rexBuilder)
+ // transform the join recursively, may change all the Coalesce in the join
+ // to Case when, this is ok for us now, cause the join will never be matched again
+ // by this rule after the transformation.
+ val newJoin = join.copy(
+ join.getTraitSet,
+ newCondition,
+ join.getLeft,
+ join.getRight,
+ join.getJoinType,
+ join.isSemiJoinDone)
+ call.transformTo(newJoin)
+ }
+}
+
+/**
+ * Planner rule that rewrites `Coalesce` in calc expression list to `Case When`.
+ */
+class CalcRewriteCoalesceRule extends
+ RewriteCoalesceRule(
+ classOf[LogicalCalc],
+ "CalcRewriteCoalesceRule") {
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val calc: Calc = call.rel(0)
+ val program = calc.getProgram
+ val exprList = program.getExprList
+ exprList.exists(p => existsCoalesce(p))
+ }
+
+ override def onMatch(call: RelOptRuleCall): Unit = {
+ val calc: Calc = call.rel(0)
+ val relBuilder: RelBuilder = call.builder()
+ val rexBuilder: RexBuilder = relBuilder.getRexBuilder
+ // transform the Calc recursively, may change all the Coalesce in the Calc
+ // to Case when, this is ok for us now, cause the Calc will never be matched again
+ // by this rule after the transformation.
+ val program = calc.getProgram
+ val exprList = program.getExprList.map(expr => replace(expr, rexBuilder))
+ val builder: RexProgramBuilder = new RexProgramBuilder(
+ calc.getInput.getRowType, calc.getCluster.getRexBuilder)
+ val list = exprList.map(expr => builder.registerInput(expr))
+ if (program.getCondition != null) {
+ val conditionIndex = program.getCondition.getIndex
+ builder.addCondition(list.get(conditionIndex))
+ }
+ program.getProjectList.zipWithIndex.foreach {
+ case (projectExpr, idx) =>
+ val index = projectExpr.getIndex
+ builder.addProject(list.get(index).getIndex,
+ program.getOutputRowType.getFieldNames.get(idx))
+ }
+
+ val newCalc = calc.copy(calc.getTraitSet, calc.getInput, builder.getProgram)
+ call.transformTo(newCalc)
+ }
+}
+
+object RewriteCoalesceRule {
+ val FILTER_INSTANCE = new FilterRewriteCoalesceRule
+ val PROJECT_INSTANCE = new ProjectRewriteCoalesceRule
+ val JOIN_INSTANCE = new JoinRewriteCoalesceRule
+ val CALC_INSTANCE = new CalcRewriteCoalesceRule
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/CalcTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/CalcTest.xml
index 48faff1..d22ee5e 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/CalcTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/CalcTest.xml
@@ -103,7 +103,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a, b, CAST(_UTF-16LE'xx':VARCHAR(65536) CHARACTER SET "UTF-16LE") AS c], where=[AND(OR(=(b, 1), =(b, 3), =(b, 4), =(b, 5), =(b, 6)), =(c, _UTF-16LE'xx':VARCHAR(65536) CHARACTER SET "UTF-16LE"))])
+Calc(select=[a, b, CAST(_UTF-16LE'xx':VARCHAR(65536) CHARACTER SET "UTF-16LE") AS c], where=[AND(IN(b, 1, 3, 4, 5, 6), =(c, _UTF-16LE'xx':VARCHAR(65536) CHARACTER SET "UTF-16LE"))])
+- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
@@ -210,7 +210,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a, b, c], where=[OR(AND(<>(b, 1), <>(b, 3), <>(b, 4), <>(b, 5), <>(b, 6)), =(c, _UTF-16LE'xx':VARCHAR(65536) CHARACTER SET "UTF-16LE"))])
+Calc(select=[a, b, c], where=[OR(NOT IN(b, 1, 3, 4, 5, 6), =(c, _UTF-16LE'xx':VARCHAR(65536) CHARACTER SET "UTF-16LE"))])
+- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/agg/GroupingSetsTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/agg/GroupingSetsTest.xml
index ef039af..07fc423 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/agg/GroupingSetsTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/agg/GroupingSetsTest.xml
@@ -728,10 +728,10 @@ LogicalProject(c=[$0])
<Resource name="planAfter">
<![CDATA[
Calc(select=[c])
-+- SortLimit(orderBy=[$f2 ASC, c ASC], offset=[0], fetch=[200], global=[true])
++- SortLimit(orderBy=[EXPR$1 ASC, c ASC], offset=[0], fetch=[200], global=[true])
+- Exchange(distribution=[single])
- +- SortLimit(orderBy=[$f2 ASC, c ASC], offset=[0], fetch=[200], global=[false])
- +- Calc(select=[deptno, c, CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT) AS $f2])
+ +- SortLimit(orderBy=[EXPR$1 ASC, c ASC], offset=[0], fetch=[200], global=[false])
+ +- Calc(select=[c, CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT) AS EXPR$1])
+- HashAggregate(isMerge=[true], groupBy=[deptno, $e], select=[deptno, $e, Final_COUNT(count1$0) AS c])
+- Exchange(distribution=[hash[deptno, $e]])
+- LocalHashAggregate(groupBy=[deptno, $e], select=[deptno, $e, Partial_COUNT(*) AS count1$0])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/ConvertToNotInOrInRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/ConvertToNotInOrInRuleTest.xml
new file mode 100644
index 0000000..cf8dae6
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/ConvertToNotInOrInRuleTest.xml
@@ -0,0 +1,437 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testConvertToIn_EqualsToThreshold_Int">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable WHERE a = 1 OR a = 2 OR a = 3 OR a = 4]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(=($0, 1), =($0, 2), =($0, 3), =($0, 4))])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[IN($0, 1, 2, 3, 4)])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testConvertToIn_GreaterThanThreshold_Double">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable WHERE d = 1 OR d = 2 OR d = 3 OR d = 4 OR d = 5 OR d = 6 OR d = 7 OR d = 8 OR d = 9 OR d = 10 OR d = 11 OR d = 12 OR d = 13 OR d = 14 OR d = 15 OR d = 16 OR d = 17 OR d = 18 OR d = 19 OR d = 20]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(=($3, 1), =($3, 2), =($3, 3), =($3, 4), =($3, 5), =($3, 6), =($3, 7), =($3, 8), =($3, 9), =($3, 10), =($3, 11), =($3, 12), =($3, 13), =($3, 14), =($3, 15), =($3, 16), =($3, 17), =($3, 18), =($3, 19), =($3, 20))])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[IN($3, 1:DOUBLE, 2:DOUBLE, 3:DOUBLE, 4:DOUBLE, 5:DOUBLE, 6:DOUBLE, 7:DOUBLE, 8:DOUBLE, 9:DOUBLE, 10:DOUBLE, 11:DOUBLE, 12:DOUBLE, 13:DOUBLE, 14:DOUBLE, 15:DOUBLE, 16:DOUBLE, 17:DOUBLE, 18:DOUBLE, 19:DOUBLE, 20:DOUBLE)])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testConvertToIn_GreaterThanThreshold_Int">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable WHERE a = 1 OR a = 2 OR a = 3 OR a = 4 OR a = 5]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(=($0, 1), =($0, 2), =($0, 3), =($0, 4), =($0, 5))])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[IN($0, 1, 2, 3, 4, 5)])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testConvertToIn_LessThanThreshold_Double">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable WHERE d = 1 OR d = 2 OR d = 3 OR d = 4 OR d = 5 OR d = 6 OR d = 7 OR d = 8 OR d = 9 OR d = 10 OR d = 11 OR d = 12 OR d = 13 OR d = 14 OR d = 15 OR d = 16 OR d = 17 OR d = 18 OR d = 19]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(=($3, 1), =($3, 2), =($3, 3), =($3, 4), =($3, 5), =($3, 6), =($3, 7), =($3, 8), =($3, 9), =($3, 10), =($3, 11), =($3, 12), =($3, 13), =($3, 14), =($3, 15), =($3, 16), =($3, 17), =($3, 18), =($3, 19))])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(=($3, 1), =($3, 2), =($3, 3), =($3, 4), =($3, 5), =($3, 6), =($3, 7), =($3, 8), =($3, 9), =($3, 10), =($3, 11), =($3, 12), =($3, 13), =($3, 14), =($3, 15), =($3, 16), =($3, 17), =($3, 18), =($3, 19))])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testConvertToIn_LessThanThreshold_Int">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable WHERE a = 1 OR a = 2 OR a = 3]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(=($0, 1), =($0, 2), =($0, 3))])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(=($0, 1), =($0, 2), =($0, 3))])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testConvertToIn_WithAnd1">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable WHERE (a = 1 OR a = 2 OR a = 3 OR a = 4) AND b = 1]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[AND(OR(=($0, 1), =($0, 2), =($0, 3), =($0, 4)), =($1, 1))])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[AND(IN($0, 1, 2, 3, 4), =($1, 1))])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testConvertToIn_WithAnd2">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable WHERE a = 1 OR a = 2 OR a = 3 OR a = 4 AND b = 1]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(=($0, 1), =($0, 2), =($0, 3), AND(=($0, 4), =($1, 1)))])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(=($0, 1), =($0, 2), =($0, 3), AND(=($0, 4), =($1, 1)))])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testConvertToIn_WithOr1">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable WHERE a = 1 OR a = 2 OR a = 3 OR a = 4 OR b = 1]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(=($0, 1), =($0, 2), =($0, 3), =($0, 4), =($1, 1))])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(=($1, 1), IN($0, 1, 2, 3, 4))])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testConvertToIn_WithOr2">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable WHERE a = 1 OR a = 2 OR b = 1 OR a = 3 OR a = 4]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(=($0, 1), =($0, 2), =($1, 1), =($0, 3), =($0, 4))])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(=($1, 1), IN($0, 1, 2, 3, 4))])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testConvertToInAndNotIn1">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable WHERE a = 1 OR a = 2 OR a = 3 OR a = 4 OR b = 1 OR (a <> 1 AND a <> 2 AND a <> 3 AND a <> 4)]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(=($0, 1), =($0, 2), =($0, 3), =($0, 4), =($1, 1), AND(<>($0, 1), <>($0, 2), <>($0, 3), <>($0, 4)))])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(NOT IN($0, 1, 2, 3, 4), =($1, 1), IN($0, 1, 2, 3, 4))])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testConvertToInAndNotIn2">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable WHERE b = 1 OR a = 1 OR a = 2 OR a = 3 OR a = 4 AND (a <> 1 AND a <> 2 AND a <> 3 AND a <> 4)]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(=($1, 1), =($0, 1), =($0, 2), =($0, 3), AND(=($0, 4), <>($0, 1), <>($0, 2), <>($0, 3), <>($0, 4)))])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(=($1, 1), =($0, 1), =($0, 2), =($0, 3), AND(=($0, 4), NOT IN($0, 1, 2, 3, 4)))])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testConvertToInAndNotIn3">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable WHERE b = 1 OR b = 2 OR (a <> 1 AND a <> 2 AND a <> 3 AND a <> 4 AND c = 1) OR b = 3 OR b = 4 OR c = 1]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(=($1, 1), =($1, 2), AND(<>($0, 1), <>($0, 2), <>($0, 3), <>($0, 4), =($2, 1)), =($1, 3), =($1, 4), =($2, 1))])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(AND(=($2, 1), NOT IN($0, 1, 2, 3, 4)), IN($1, 1:BIGINT, 2:BIGINT, 3:BIGINT, 4:BIGINT), =($2, 1))])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testConvertToNotIn_EqualsToThreshold_Int">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable WHERE a <> 1 AND a <> 2 AND a <> 3 AND a <> 4]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[AND(<>($0, 1), <>($0, 2), <>($0, 3), <>($0, 4))])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[NOT IN($0, 1, 2, 3, 4)])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testConvertToNotIn_GreaterThanThreshold_Double">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable WHERE d <> 1 AND d <> 2 AND d <> 3 AND d <> 4 AND d <> 5 AND d <> 6 AND d <> 7 AND d <> 8 AND d <> 9 AND d <> 10 AND d <> 11 AND d <> 12 AND d <> 13 AND d <> 14 AND d <> 15 AND d <> 16 AND d <> 17 AND d <> 18 AND d <> 19 AND d <> 20]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[AND(<>($3, 1), <>($3, 2), <>($3, 3), <>($3, 4), <>($3, 5), <>($3, 6), <>($3, 7), <>($3, 8), <>($3, 9), <>($3, 10), <>($3, 11), <>($3, 12), <>($3, 13), <>($3, 14), <>($3, 15), <>($3, 16), <>($3, 17), <>($3, 18), <>($3, 19), <>($3, 20))])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[NOT IN($3, 1:DOUBLE, 2:DOUBLE, 3:DOUBLE, 4:DOUBLE, 5:DOUBLE, 6:DOUBLE, 7:DOUBLE, 8:DOUBLE, 9:DOUBLE, 10:DOUBLE, 11:DOUBLE, 12:DOUBLE, 13:DOUBLE, 14:DOUBLE, 15:DOUBLE, 16:DOUBLE, 17:DOUBLE, 18:DOUBLE, 19:DOUBLE, 20:DOUBLE)])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testConvertToNotIn_GreaterThanThreshold_Int">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable WHERE a <> 1 AND a <> 2 AND a <> 3 AND a <> 4 AND a = 5]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[AND(<>($0, 1), <>($0, 2), <>($0, 3), <>($0, 4), =($0, 5))])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[AND(=($0, 5), NOT IN($0, 1, 2, 3, 4))])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testConvertToNotIn_LessThanThreshold_Double">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable WHERE d <> 1 AND d <> 2 AND d <> 3 AND d <> 4 AND d <> 5 AND d <> 6 AND d <> 7 AND d <> 8 AND d <> 9 AND d <> 10 AND d <> 11 AND d <> 12 AND d <> 13 AND d <> 14 AND d <> 15 AND d <> 16 AND d <> 17 AND d <> 18 AND d <> 19]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[AND(<>($3, 1), <>($3, 2), <>($3, 3), <>($3, 4), <>($3, 5), <>($3, 6), <>($3, 7), <>($3, 8), <>($3, 9), <>($3, 10), <>($3, 11), <>($3, 12), <>($3, 13), <>($3, 14), <>($3, 15), <>($3, 16), <>($3, 17), <>($3, 18), <>($3, 19))])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[AND(<>($3, 1), <>($3, 2), <>($3, 3), <>($3, 4), <>($3, 5), <>($3, 6), <>($3, 7), <>($3, 8), <>($3, 9), <>($3, 10), <>($3, 11), <>($3, 12), <>($3, 13), <>($3, 14), <>($3, 15), <>($3, 16), <>($3, 17), <>($3, 18), <>($3, 19))])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testConvertToNotIn_LessThanThreshold_Int">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable WHERE a <> 1 AND a <> 2 AND a <> 3]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[AND(<>($0, 1), <>($0, 2), <>($0, 3))])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[AND(<>($0, 1), <>($0, 2), <>($0, 3))])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testConvertToNotIn_WithAnd1">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable WHERE a <> 1 AND a <> 2 AND a <> 3 AND a <> 4 AND b = 1]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[AND(<>($0, 1), <>($0, 2), <>($0, 3), <>($0, 4), =($1, 1))])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[AND(=($1, 1), NOT IN($0, 1, 2, 3, 4))])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testConvertToNotIn_WithAnd2">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable WHERE a <> 1 AND a <> 2 AND b = 1 AND a <> 3 AND a <> 4]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[AND(<>($0, 1), <>($0, 2), =($1, 1), <>($0, 3), <>($0, 4))])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[AND(=($1, 1), NOT IN($0, 1, 2, 3, 4))])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testConvertToNotIn_WithOr1">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable WHERE (a <> 1 AND a <> 2 AND a <> 3 AND a <> 4) OR b = 1]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(AND(<>($0, 1), <>($0, 2), <>($0, 3), <>($0, 4)), =($1, 1))])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(NOT IN($0, 1, 2, 3, 4), =($1, 1))])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testConvertToNotIn_WithOr2">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable WHERE a <> 1 AND a <> 2 AND a <> 3 AND a <> 4 OR b = 1]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(AND(<>($0, 1), <>($0, 2), <>($0, 3), <>($0, 4)), =($1, 1))])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(NOT IN($0, 1, 2, 3, 4), =($1, 1))])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testConvertToNotIn_WithOr3">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable WHERE a <> 1 OR a <> 2 OR a <> 3 OR a <> 4 OR b = 1]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(<>($0, 1), <>($0, 2), <>($0, 3), <>($0, 4), =($1, 1))])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4])
++- LogicalFilter(condition=[OR(<>($0, 1), <>($0, 2), <>($0, 3), <>($0, 4), =($1, 1))])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRuleTest.xml
index 7d2de86..3bbbd71 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRuleTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRuleTest.xml
@@ -54,8 +54,8 @@ LogicalProject(EXPR$0=[$1], EXPR$1=[$2])
<Resource name="planAfter">
<![CDATA[
FlinkLogicalCalc(select=[EXPR$0, EXPR$1])
-+- FlinkLogicalAggregate(group=[{1}], EXPR$0=[SUM($0) FILTER $3], EXPR$1=[COUNT($2) FILTER $4])
- +- FlinkLogicalCalc(select=[a, b, c, =(CASE(=($e, 1:BIGINT), 1:BIGINT, 4:BIGINT), 1) AS $g_1, =(CASE(=($e, 1:BIGINT), 1:BIGINT, 4:BIGINT), 4) AS $g_4])
++- FlinkLogicalAggregate(group=[{0}], EXPR$0=[SUM($1) FILTER $3], EXPR$1=[COUNT($2) FILTER $4])
+ +- FlinkLogicalCalc(select=[b, a, c, =(CASE(=($e, 1:BIGINT), 1:BIGINT, 2:BIGINT), 1) AS $g_1, =(CASE(=($e, 1:BIGINT), 1:BIGINT, 2:BIGINT), 2) AS $g_2])
+- FlinkLogicalAggregate(group=[{0, 1, 2, 3}])
+- FlinkLogicalExpand(projects=[{a=[$0], b=[$1], c=[null], $e=[1]}, {a=[null], b=[$1], c=[$2], $e=[4]}])
+- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
@@ -99,8 +99,8 @@ LogicalProject(EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4])
<Resource name="planAfter">
<![CDATA[
FlinkLogicalCalc(select=[EXPR$0, EXPR$1, EXPR$2, EXPR$3])
-+- FlinkLogicalAggregate(group=[{2}], EXPR$0=[COUNT($0) FILTER $5], EXPR$1=[SUM($1) FILTER $6], EXPR$2=[MIN($3) FILTER $7], EXPR$3=[MIN($4) FILTER $7])
- +- FlinkLogicalCalc(select=[a, b, c, EXPR$2, EXPR$3, =(CASE(=($e, 2:BIGINT), 2:BIGINT, =($e, 4:BIGINT), 4:BIGINT, 6:BIGINT), 2) AS $g_2, =(CASE(=($e, 2:BIGINT), 2:BIGINT, =($e, 4:BIGINT), 4:BIGINT, 6:BIGINT), 4) AS $g_4, =(CASE(=($e, 2:BIGINT), 2:BIGINT, =($e, 4:BIGINT), 4:BIGINT, 6:BIGINT), 6) AS $g_6])
++- FlinkLogicalAggregate(group=[{0}], EXPR$0=[COUNT($1) FILTER $5], EXPR$1=[SUM($2) FILTER $6], EXPR$2=[MIN($3) FILTER $7], EXPR$3=[MIN($4) FILTER $7])
+ +- FlinkLogicalCalc(select=[c, a, b, EXPR$2, EXPR$3, =(CASE(=($e, 2:BIGINT), 1:BIGINT, =($e, 4:BIGINT), 2:BIGINT, 3:BIGINT), 1) AS $g_1, =(CASE(=($e, 2:BIGINT), 1:BIGINT, =($e, 4:BIGINT), 2:BIGINT, 3:BIGINT), 2) AS $g_2, =(CASE(=($e, 2:BIGINT), 1:BIGINT, =($e, 4:BIGINT), 2:BIGINT, 3:BIGINT), 3) AS $g_3])
+- FlinkLogicalAggregate(group=[{0, 1, 2, 3}], EXPR$2=[MAX($4)], EXPR$3=[MIN($4)])
+- FlinkLogicalExpand(projects=[{a=[$0], b=[null], c=[$2], $e=[2], a_0=[$0]}, {a=[null], b=[$1], c=[$2], $e=[4], a_0=[$0]}, {a=[null], b=[null], c=[$2], $e=[6], a_0=[$0]}])
+- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/FlinkCalcMergeRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/FlinkCalcMergeRuleTest.xml
new file mode 100644
index 0000000..51bca73
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/FlinkCalcMergeRuleTest.xml
@@ -0,0 +1,80 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testCalcMergeWithSameDigest">
+ <Resource name="sql">
+ <![CDATA[SELECT a, b FROM (SELECT * FROM MyTable WHERE a = b) t WHERE b = a]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalFilter(condition=[=($1, $0)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2])
+ +- LogicalFilter(condition=[=($0, $1)])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+FlinkLogicalCalc(select=[a, b], where=[=(a, b)])
++- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCalcMergeWithNonDeterministicExpr1">
+ <Resource name="sql">
+ <![CDATA[SELECT a, a1 FROM (SELECT a, random_udf(a) AS a1 FROM MyTable) t WHERE a1 > 10]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], a1=[$1])
++- LogicalFilter(condition=[>($1, 10)])
+ +- LogicalProject(a=[$0], a1=[random_udf($0)])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+FlinkLogicalCalc(select=[a, a1], where=[>(a1, 10)])
++- FlinkLogicalCalc(select=[a, random_udf(a) AS a1])
+ +- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCalcMergeWithNonDeterministicExpr2">
+ <Resource name="sql">
+ <![CDATA[SELECT a FROM (SELECT a FROM MyTable) t WHERE random_udf(a) > 10]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0])
++- LogicalFilter(condition=[>(random_udf($0), 10)])
+ +- LogicalProject(a=[$0])
+ +- LogicalTableScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+FlinkLogicalCalc(select=[a])
++- FlinkLogicalCalc(select=[a], where=[>(random_udf(a), 10)])
+ +- FlinkLogicalCalc(select=[a])
+ +- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/RewriteCoalesceRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/RewriteCoalesceRuleTest.xml
new file mode 100644
index 0000000..8737c22
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/RewriteCoalesceRuleTest.xml
@@ -0,0 +1,164 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testCalcite1018">
+ <Resource name="sql">
+ <![CDATA[
+select * from (select * from scott_emp) e left join (
+ select * from scott_dept d) using (deptno)
+ order by empno limit 10
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[10])
++- LogicalProject(deptno=[COALESCE($7, $8)], empno=[$0], ename=[$1], job=[$2], mgr=[$3], hiredate=[$4], sal=[$5], comm=[$6], dname=[$9], loc=[$10])
+ +- LogicalJoin(condition=[=($7, $8)], joinType=[left])
+ :- LogicalProject(empno=[$0], ename=[$1], job=[$2], mgr=[$3], hiredate=[$4], sal=[$5], comm=[$6], deptno=[$7])
+ : +- LogicalTableScan(table=[[scott_emp, source: [TestTableSource(empno, ename, job, mgr, hiredate, sal, comm, deptno)]]])
+ +- LogicalProject(deptno=[$0], dname=[$1], loc=[$2])
+ +- LogicalTableScan(table=[[scott_dept, source: [TestTableSource(deptno, dname, loc)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[10])
++- LogicalProject($f0=[CASE(IS NOT NULL($7), $7, $8)], empno=[$0], ename=[$1], job=[$2], mgr=[$3], hiredate=[$4], sal=[$5], comm=[$6], dname=[$9], loc=[$10])
+ +- LogicalJoin(condition=[=($7, $8)], joinType=[left])
+ :- LogicalProject(empno=[$0], ename=[$1], job=[$2], mgr=[$3], hiredate=[$4], sal=[$5], comm=[$6], deptno=[$7])
+ : +- LogicalTableScan(table=[[scott_emp, source: [TestTableSource(empno, ename, job, mgr, hiredate, sal, comm, deptno)]]])
+ +- LogicalProject(deptno=[$0], dname=[$1], loc=[$2])
+ +- LogicalTableScan(table=[[scott_dept, source: [TestTableSource(deptno, dname, loc)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCoalesceConstantReduce">
+ <Resource name="sql">
+ <![CDATA[
+select * from lateral (select * from scott_emp) as e
+ join (table scott_dept) using (deptno)
+ where e.deptno = 10
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(deptno=[COALESCE($7, $8)], empno=[$0], ename=[$1], job=[$2], mgr=[$3], hiredate=[$4], sal=[$5], comm=[$6], dname=[$9], loc=[$10])
++- LogicalFilter(condition=[=($7, 10)])
+ +- LogicalJoin(condition=[=($7, $8)], joinType=[inner])
+ :- LogicalProject(empno=[$0], ename=[$1], job=[$2], mgr=[$3], hiredate=[$4], sal=[$5], comm=[$6], deptno=[$7])
+ : +- LogicalTableScan(table=[[scott_emp, source: [TestTableSource(empno, ename, job, mgr, hiredate, sal, comm, deptno)]]])
+ +- LogicalProject(deptno=[$0], dname=[$1], loc=[$2])
+ +- LogicalTableScan(table=[[scott_dept, source: [TestTableSource(deptno, dname, loc)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject($f0=[CAST(10):INTEGER], empno=[$0], ename=[$1], job=[$2], mgr=[$3], hiredate=[$4], sal=[$5], comm=[$6], dname=[$9], loc=[$10])
++- LogicalFilter(condition=[=($7, 10)])
+ +- LogicalJoin(condition=[=($7, $8)], joinType=[inner])
+ :- LogicalProject(empno=[$0], ename=[$1], job=[$2], mgr=[$3], hiredate=[$4], sal=[$5], comm=[$6], deptno=[$7])
+ : +- LogicalTableScan(table=[[scott_emp, source: [TestTableSource(empno, ename, job, mgr, hiredate, sal, comm, deptno)]]])
+ +- LogicalProject(deptno=[$0], dname=[$1], loc=[$2])
+ +- LogicalTableScan(table=[[scott_dept, source: [TestTableSource(deptno, dname, loc)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testNaturalJoinLeftOuter">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM scott_dept
+ natural left join scott_emp
+ order by scott_dept.deptno, scott_emp.deptno
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(deptno=[$0], dname=[$1], loc=[$2], empno=[$3], ename=[$4], job=[$5], mgr=[$6], hiredate=[$7], sal=[$8], comm=[$9])
++- LogicalSort(sort0=[$10], sort1=[$11], dir0=[ASC-nulls-first], dir1=[ASC-nulls-first])
+ +- LogicalProject(deptno=[COALESCE($0, $10)], dname=[$1], loc=[$2], empno=[$3], ename=[$4], job=[$5], mgr=[$6], hiredate=[$7], sal=[$8], comm=[$9], deptno0=[$0], deptno1=[$10])
+ +- LogicalJoin(condition=[=($0, $10)], joinType=[left])
+ :- LogicalTableScan(table=[[scott_dept, source: [TestTableSource(deptno, dname, loc)]]])
+ +- LogicalTableScan(table=[[scott_emp, source: [TestTableSource(empno, ename, job, mgr, hiredate, sal, comm, deptno)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(deptno=[$0], dname=[$1], loc=[$2], empno=[$3], ename=[$4], job=[$5], mgr=[$6], hiredate=[$7], sal=[$8], comm=[$9])
++- LogicalSort(sort0=[$10], sort1=[$11], dir0=[ASC-nulls-first], dir1=[ASC-nulls-first])
+ +- LogicalProject($f0=[CASE(IS NOT NULL($0), $0, $10)], dname=[$1], loc=[$2], empno=[$3], ename=[$4], job=[$5], mgr=[$6], hiredate=[$7], sal=[$8], comm=[$9], deptno=[$0], deptno0=[$10])
+ +- LogicalJoin(condition=[=($0, $10)], joinType=[left])
+ :- LogicalTableScan(table=[[scott_dept, source: [TestTableSource(deptno, dname, loc)]]])
+ +- LogicalTableScan(table=[[scott_emp, source: [TestTableSource(empno, ename, job, mgr, hiredate, sal, comm, deptno)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testNaturalJoinRightOuter">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM scott_dept
+ natural right join scott_emp
+ order by scott_dept.deptno, scott_emp.deptno
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(deptno=[$0], dname=[$1], loc=[$2], empno=[$3], ename=[$4], job=[$5], mgr=[$6], hiredate=[$7], sal=[$8], comm=[$9])
++- LogicalSort(sort0=[$10], sort1=[$11], dir0=[ASC-nulls-first], dir1=[ASC-nulls-first])
+ +- LogicalProject(deptno=[COALESCE($0, $10)], dname=[$1], loc=[$2], empno=[$3], ename=[$4], job=[$5], mgr=[$6], hiredate=[$7], sal=[$8], comm=[$9], deptno0=[$0], deptno1=[$10])
+ +- LogicalJoin(condition=[=($0, $10)], joinType=[right])
+ :- LogicalTableScan(table=[[scott_dept, source: [TestTableSource(deptno, dname, loc)]]])
+ +- LogicalTableScan(table=[[scott_emp, source: [TestTableSource(empno, ename, job, mgr, hiredate, sal, comm, deptno)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject(deptno=[$0], dname=[$1], loc=[$2], empno=[$3], ename=[$4], job=[$5], mgr=[$6], hiredate=[$7], sal=[$8], comm=[$9])
++- LogicalSort(sort0=[$10], sort1=[$11], dir0=[ASC-nulls-first], dir1=[ASC-nulls-first])
+ +- LogicalProject($f0=[CASE(IS NOT NULL($0), $0, $10)], dname=[$1], loc=[$2], empno=[$3], ename=[$4], job=[$5], mgr=[$6], hiredate=[$7], sal=[$8], comm=[$9], deptno=[$0], deptno0=[$10])
+ +- LogicalJoin(condition=[=($0, $10)], joinType=[right])
+ :- LogicalTableScan(table=[[scott_dept, source: [TestTableSource(deptno, dname, loc)]]])
+ +- LogicalTableScan(table=[[scott_emp, source: [TestTableSource(empno, ename, job, mgr, hiredate, sal, comm, deptno)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testNaturalJoinWithPredicates">
+ <Resource name="sql">
+ <![CDATA[
+select * from scott_dept natural join scott_emp where empno = 1
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(deptno=[COALESCE($0, $10)], dname=[$1], loc=[$2], empno=[$3], ename=[$4], job=[$5], mgr=[$6], hiredate=[$7], sal=[$8], comm=[$9])
++- LogicalFilter(condition=[=($3, 1)])
+ +- LogicalJoin(condition=[=($0, $10)], joinType=[inner])
+ :- LogicalTableScan(table=[[scott_dept, source: [TestTableSource(deptno, dname, loc)]]])
+ +- LogicalTableScan(table=[[scott_emp, source: [TestTableSource(empno, ename, job, mgr, hiredate, sal, comm, deptno)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LogicalProject($f0=[CASE(IS NOT NULL($0), $0, $10)], dname=[$1], loc=[$2], empno=[CAST(1):INTEGER], ename=[$4], job=[$5], mgr=[$6], hiredate=[$7], sal=[$8], comm=[$9])
++- LogicalFilter(condition=[=($3, 1)])
+ +- LogicalJoin(condition=[=($0, $10)], joinType=[inner])
+ :- LogicalTableScan(table=[[scott_dept, source: [TestTableSource(deptno, dname, loc)]]])
+ +- LogicalTableScan(table=[[scott_emp, source: [TestTableSource(empno, ename, job, mgr, hiredate, sal, comm, deptno)]]])
+]]>
+ </Resource>
+ </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/SplitAggregateRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/SplitAggregateRuleTest.xml
index 87b17ae..78066af 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/SplitAggregateRuleTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/SplitAggregateRuleTest.xml
@@ -41,9 +41,8 @@ FlinkLogicalAggregate(group=[{0}], agg#0=[$SUM0($2)], agg#1=[SUM($3)], agg#2=[SU
+- FlinkLogicalAggregate(group=[{0, 4}], agg#0=[COUNT(DISTINCT $1) FILTER $5], agg#1=[SUM($1) FILTER $6], agg#2=[SUM($1) FILTER $7])
+- FlinkLogicalCalc(select=[a, b, $f2, $f3, $f4, AND(=($e, 0), $f2) AS $g_0, AND(=($e, 1), $f3) AS $g_1, AND(=($e, 1), $f2) AS $g_10])
+- FlinkLogicalExpand(projects=[{a=[$0], b=[$1], $f2=[$2], $f3=[$3], $f4=[$4], $e=[0]}, {a=[$0], b=[$1], $f2=[$2], $f3=[$3], $f4=[null], $e=[1]}])
- +- FlinkLogicalCalc(select=[a, b, $f2, $f3, MOD(HASH_CODE(b), 1024) AS $f4])
- +- FlinkLogicalCalc(select=[a, b, IS TRUE(<>(b, 2)) AS $f2, IS TRUE(<>(b, 5)) AS $f3])
- +- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- FlinkLogicalCalc(select=[a, b, IS TRUE(<>(b, 2)) AS $f2, IS TRUE(<>(b, 5)) AS $f3, MOD(HASH_CODE(b), 1024) AS $f4])
+ +- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -90,8 +89,7 @@ FlinkLogicalJoin(condition=[=($1, $4)], joinType=[inner])
: +- FlinkLogicalCalc(select=[a, b, $f2, =($e, 0) AS $g_0, =($e, 1) AS $g_1])
: +- FlinkLogicalExpand(projects=[{a=[$0], b=[$1], $f2=[$2], $e=[0]}, {a=[$0], b=[$1], $f2=[null], $e=[1]}])
: +- FlinkLogicalCalc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2])
-: +- FlinkLogicalCalc(select=[a, b])
-: +- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+: +- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
@@ -112,8 +110,7 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)])
FlinkLogicalAggregate(group=[{}], agg#0=[$SUM0($1)])
+- FlinkLogicalAggregate(group=[{1}], agg#0=[COUNT(DISTINCT $0)])
+- FlinkLogicalCalc(select=[c, MOD(HASH_CODE(c), 100) AS $f1])
- +- FlinkLogicalCalc(select=[c])
- +- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -135,8 +132,7 @@ FlinkLogicalAggregate(group=[{}], agg#0=[$SUM0($2)], agg#1=[SUM($3)])
+- FlinkLogicalCalc(select=[a, b, $f2, $f3, =($e, 1) AS $g_1, =($e, 2) AS $g_2])
+- FlinkLogicalExpand(projects=[{a=[$0], b=[$1], $f2=[$2], $f3=[null], $e=[1]}, {a=[$0], b=[$1], $f2=[null], $f3=[$3], $e=[2]}])
+- FlinkLogicalCalc(select=[a, b, MOD(HASH_CODE(a), 1024) AS $f2, MOD(HASH_CODE(b), 1024) AS $f3])
- +- FlinkLogicalCalc(select=[a, b])
- +- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -211,8 +207,7 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)])
FlinkLogicalAggregate(group=[{}], agg#0=[$SUM0($1)])
+- FlinkLogicalAggregate(group=[{1}], agg#0=[COUNT(DISTINCT $0)])
+- FlinkLogicalCalc(select=[c, MOD(HASH_CODE(c), 1024) AS $f1])
- +- FlinkLogicalCalc(select=[c])
- +- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -256,15 +251,13 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[SUM($1)], EXP
</Resource>
<Resource name="planAfter">
<![CDATA[
-FlinkLogicalCalc(select=[a, $f1, $f2, CAST($f3) AS $f3])
-+- FlinkLogicalCalc(select=[a, $f1, $f2, /($f3, $f4) AS $f3])
- +- FlinkLogicalAggregate(group=[{0}], agg#0=[$SUM0($2)], agg#1=[SUM($3)], agg#2=[$SUM0($4)], agg#3=[$SUM0($5)])
- +- FlinkLogicalAggregate(group=[{0, 2}], agg#0=[COUNT(DISTINCT $1) FILTER $3], agg#1=[SUM($1) FILTER $4], agg#2=[$SUM0($1) FILTER $4], agg#3=[COUNT($1) FILTER $4])
- +- FlinkLogicalCalc(select=[a, b, $f2, =($e, 0) AS $g_0, =($e, 1) AS $g_1])
- +- FlinkLogicalExpand(projects=[{a=[$0], b=[$1], $f2=[$2], $e=[0]}, {a=[$0], b=[$1], $f2=[null], $e=[1]}])
- +- FlinkLogicalCalc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2])
- +- FlinkLogicalCalc(select=[a, b])
- +- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+FlinkLogicalCalc(select=[a, $f1, $f2, CAST(/($f3, $f4)) AS $f3])
++- FlinkLogicalAggregate(group=[{0}], agg#0=[$SUM0($2)], agg#1=[SUM($3)], agg#2=[$SUM0($4)], agg#3=[$SUM0($5)])
+ +- FlinkLogicalAggregate(group=[{0, 2}], agg#0=[COUNT(DISTINCT $1) FILTER $3], agg#1=[SUM($1) FILTER $4], agg#2=[$SUM0($1) FILTER $4], agg#3=[COUNT($1) FILTER $4])
+ +- FlinkLogicalCalc(select=[a, b, $f2, =($e, 0) AS $g_0, =($e, 1) AS $g_1])
+ +- FlinkLogicalExpand(projects=[{a=[$0], b=[$1], $f2=[$2], $e=[0]}, {a=[$0], b=[$1], $f2=[null], $e=[1]}])
+ +- FlinkLogicalCalc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2])
+ +- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -284,8 +277,7 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)])
FlinkLogicalAggregate(group=[{0}], agg#0=[$SUM0($2)])
+- FlinkLogicalAggregate(group=[{0, 2}], agg#0=[COUNT(DISTINCT $1)])
+- FlinkLogicalCalc(select=[a, c, MOD(HASH_CODE(c), 1024) AS $f2])
- +- FlinkLogicalCalc(select=[a, c])
- +- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- FlinkLogicalTableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/CalcTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/CalcTest.xml
index 48faff1..d22ee5e 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/CalcTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/CalcTest.xml
@@ -103,7 +103,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a, b, CAST(_UTF-16LE'xx':VARCHAR(65536) CHARACTER SET "UTF-16LE") AS c], where=[AND(OR(=(b, 1), =(b, 3), =(b, 4), =(b, 5), =(b, 6)), =(c, _UTF-16LE'xx':VARCHAR(65536) CHARACTER SET "UTF-16LE"))])
+Calc(select=[a, b, CAST(_UTF-16LE'xx':VARCHAR(65536) CHARACTER SET "UTF-16LE") AS c], where=[AND(IN(b, 1, 3, 4, 5, 6), =(c, _UTF-16LE'xx':VARCHAR(65536) CHARACTER SET "UTF-16LE"))])
+- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
@@ -210,7 +210,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2])
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a, b, c], where=[OR(AND(<>(b, 1), <>(b, 3), <>(b, 4), <>(b, 5), <>(b, 6)), =(c, _UTF-16LE'xx':VARCHAR(65536) CHARACTER SET "UTF-16LE"))])
+Calc(select=[a, b, c], where=[OR(NOT IN(b, 1, 3, 4, 5, 6), =(c, _UTF-16LE'xx':VARCHAR(65536) CHARACTER SET "UTF-16LE"))])
+- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/DistinctAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/DistinctAggregateTest.xml
index 728c121..4d869c3 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/DistinctAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/DistinctAggregateTest.xml
@@ -100,9 +100,8 @@ GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($
+- Exchange(distribution=[hash[a, $f4]])
+- Calc(select=[a, b, $f2, $f3, $f4, AND(=($e, 0), $f2) AS $g_0, AND(=($e, 1), $f3) AS $g_1, AND(=($e, 1), $f2) AS $g_10])
+- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $f3=[$3], $f4=[$4], $e=[0]}, {a=[$0], b=[$1], $f2=[$2], $f3=[$3], $f4=[null], $e=[1]}])
- +- Calc(select=[a, b, $f2, $f3, MOD(HASH_CODE(b), 1024) AS $f4])
- +- Calc(select=[a, b, IS TRUE(<>(b, 2:BIGINT)) AS $f2, IS TRUE(<>(b, 5:BIGINT)) AS $f3])
- +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- Calc(select=[a, b, IS TRUE(<>(b, 2:BIGINT)) AS $f2, IS TRUE(<>(b, 5:BIGINT)) AS $f3, MOD(HASH_CODE(b), 1024) AS $f4])
+ +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -135,9 +134,8 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RET
+- LocalGroupAggregate(groupBy=[a, $f4], partialFinalType=[PARTIAL], select=[a, $f4, COUNT(distinct$0 b) FILTER $g_0 AS count$0, SUM(b) FILTER $g_1 AS sum$1, SUM(b) FILTER $g_10 AS sum$2, DISTINCT(b) AS distinct$0])
+- Calc(select=[a, b, $f2, $f3, $f4, AND(=($e, 0), $f2) AS $g_0, AND(=($e, 1), $f3) AS $g_1, AND(=($e, 1), $f2) AS $g_10])
+- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $f3=[$3], $f4=[$4], $e=[0]}, {a=[$0], b=[$1], $f2=[$2], $f3=[$3], $f4=[null], $e=[1]}])
- +- Calc(select=[a, b, $f2, $f3, MOD(HASH_CODE(b), 1024) AS $f4])
- +- Calc(select=[a, b, IS TRUE(<>(b, 2:BIGINT)) AS $f2, IS TRUE(<>(b, 5:BIGINT)) AS $f3])
- +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- Calc(select=[a, b, IS TRUE(<>(b, 2:BIGINT)) AS $f2, IS TRUE(<>(b, 5:BIGINT)) AS $f3, MOD(HASH_CODE(b), 1024) AS $f4])
+ +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -239,14 +237,12 @@ GroupAggregate(groupBy=[b], partialFinalType=[FINAL], select=[b, FIRST_VALUE_RET
+- GroupAggregate(groupBy=[b, $f2], partialFinalType=[PARTIAL], select=[b, $f2, FIRST_VALUE_RETRACT(c) AS $f2_0, LAST_VALUE_RETRACT(c) AS $f3, COUNT_RETRACT(DISTINCT c) AS $f4], updateAsRetraction=[true], accMode=[AccRetract])
+- Exchange(distribution=[hash[b, $f2]], updateAsRetraction=[true], accMode=[AccRetract])
+- Calc(select=[b, c, MOD(HASH_CODE(c), 1024) AS $f2], updateAsRetraction=[true], accMode=[AccRetract])
- +- Calc(select=[b, c], updateAsRetraction=[true], accMode=[AccRetract])
- +- GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($f2_0) AS $f1, MAX($f3) AS $f2], updateAsRetraction=[true], accMode=[AccRetract])
- +- Exchange(distribution=[hash[a]], updateAsRetraction=[true], accMode=[AccRetract])
- +- GroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(DISTINCT b) AS $f2_0, MAX(b) AS $f3], updateAsRetraction=[true], accMode=[AccRetract])
- +- Exchange(distribution=[hash[a, $f2]], updateAsRetraction=[true], accMode=[Acc])
- +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2], updateAsRetraction=[true], accMode=[Acc])
- +- Calc(select=[a, b], updateAsRetraction=[true], accMode=[Acc])
- +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
+ +- GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($f2_0) AS $f1, MAX($f3) AS $f2], updateAsRetraction=[true], accMode=[AccRetract])
+ +- Exchange(distribution=[hash[a]], updateAsRetraction=[true], accMode=[AccRetract])
+ +- GroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(DISTINCT b) AS $f2_0, MAX(b) AS $f3], updateAsRetraction=[true], accMode=[AccRetract])
+ +- Exchange(distribution=[hash[a, $f2]], updateAsRetraction=[true], accMode=[Acc])
+ +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2], updateAsRetraction=[true], accMode=[Acc])
+ +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
]]>
</Resource>
</TestCase>
@@ -279,16 +275,14 @@ GroupAggregate(groupBy=[b], partialFinalType=[FINAL], select=[b, FIRST_VALUE_RET
+- GroupAggregate(groupBy=[b, $f2], partialFinalType=[PARTIAL], select=[b, $f2, FIRST_VALUE_RETRACT(c) AS $f2_0, LAST_VALUE_RETRACT(c) AS $f3, COUNT_RETRACT(DISTINCT c) AS $f4], updateAsRetraction=[true], accMode=[AccRetract])
+- Exchange(distribution=[hash[b, $f2]], updateAsRetraction=[true], accMode=[AccRetract])
+- Calc(select=[b, c, MOD(HASH_CODE(c), 1024) AS $f2], updateAsRetraction=[true], accMode=[AccRetract])
- +- Calc(select=[b, c], updateAsRetraction=[true], accMode=[AccRetract])
- +- GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT(sum$0) AS $f1, MAX(max$1) AS $f2], updateAsRetraction=[true], accMode=[AccRetract])
- +- Exchange(distribution=[hash[a]], updateAsRetraction=[true], accMode=[Acc])
- +- LocalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($f2_0) AS sum$0, MAX($f3) AS max$1, COUNT_RETRACT(*) AS count1$2], updateAsRetraction=[true], accMode=[Acc])
- +- GlobalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 count$0) AS $f2_0, MAX(max$1) AS $f3], updateAsRetraction=[true], accMode=[AccRetract])
- +- Exchange(distribution=[hash[a, $f2]], updateAsRetraction=[true], accMode=[Acc])
- +- LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 b) AS count$0, MAX(b) AS max$1, DISTINCT(b) AS distinct$0], updateAsRetraction=[true], accMode=[Acc])
- +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2], updateAsRetraction=[true], accMode=[Acc])
- +- Calc(select=[a, b], updateAsRetraction=[true], accMode=[Acc])
- +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
+ +- GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT(sum$0) AS $f1, MAX(max$1) AS $f2], updateAsRetraction=[true], accMode=[AccRetract])
+ +- Exchange(distribution=[hash[a]], updateAsRetraction=[true], accMode=[Acc])
+ +- LocalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($f2_0) AS sum$0, MAX($f3) AS max$1, COUNT_RETRACT(*) AS count1$2], updateAsRetraction=[true], accMode=[Acc])
+ +- GlobalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 count$0) AS $f2_0, MAX(max$1) AS $f3], updateAsRetraction=[true], accMode=[AccRetract])
+ +- Exchange(distribution=[hash[a, $f2]], updateAsRetraction=[true], accMode=[Acc])
+ +- LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 b) AS count$0, MAX(b) AS max$1, DISTINCT(b) AS distinct$0], updateAsRetraction=[true], accMode=[Acc])
+ +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2], updateAsRetraction=[true], accMode=[Acc])
+ +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
]]>
</Resource>
</TestCase>
@@ -492,8 +486,7 @@ GroupAggregate(partialFinalType=[FINAL], select=[$SUM0_RETRACT($f2_0) AS $f0, SU
+- Calc(select=[a, b, $f2, $f3, =($e, 1) AS $g_1, =($e, 2) AS $g_2])
+- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $f3=[null], $e=[1]}, {a=[$0], b=[$1], $f2=[null], $f3=[$3], $e=[2]}])
+- Calc(select=[a, b, MOD(HASH_CODE(a), 1024) AS $f2, MOD(HASH_CODE(b), 1024) AS $f3])
- +- Calc(select=[a, b])
- +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -519,8 +512,7 @@ GlobalGroupAggregate(partialFinalType=[FINAL], select=[$SUM0_RETRACT(sum$0) AS $
+- Calc(select=[a, b, $f2, $f3, =($e, 1) AS $g_1, =($e, 2) AS $g_2])
+- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $f3=[null], $e=[1]}, {a=[$0], b=[$1], $f2=[null], $f3=[$3], $e=[2]}])
+- Calc(select=[a, b, MOD(HASH_CODE(a), 1024) AS $f2, MOD(HASH_CODE(b), 1024) AS $f3])
- +- Calc(select=[a, b])
- +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -811,8 +803,7 @@ GroupAggregate(partialFinalType=[FINAL], select=[$SUM0_RETRACT($f1_0) AS $f0])
+- GroupAggregate(groupBy=[$f1], partialFinalType=[PARTIAL], select=[$f1, COUNT(DISTINCT c) AS $f1_0])
+- Exchange(distribution=[hash[$f1]])
+- Calc(select=[c, MOD(HASH_CODE(c), 1024) AS $f1])
- +- Calc(select=[c])
- +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -836,8 +827,7 @@ GlobalGroupAggregate(partialFinalType=[FINAL], select=[$SUM0_RETRACT(sum$0) AS $
+- Exchange(distribution=[hash[$f1]])
+- LocalGroupAggregate(groupBy=[$f1], partialFinalType=[PARTIAL], select=[$f1, COUNT(distinct$0 c) AS count$0, DISTINCT(c) AS distinct$0])
+- Calc(select=[c, MOD(HASH_CODE(c), 1024) AS $f1])
- +- Calc(select=[c])
- +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -1004,17 +994,15 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[SUM($1)], EXP
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a, $f1, $f2, CAST($f3) AS $f3])
-+- Calc(select=[a, $f1, $f2, /($f3, $f4) AS $f3])
- +- GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($f2_0) AS $f1, SUM_RETRACT($f3) AS $f2, $SUM0_RETRACT($f4) AS $f3, $SUM0_RETRACT($f5) AS $f4])
- +- Exchange(distribution=[hash[a]])
- +- GroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(DISTINCT b) FILTER $g_0 AS $f2_0, SUM(b) FILTER $g_1 AS $f3, $SUM0(b) FILTER $g_1 AS $f4, COUNT(b) FILTER $g_1 AS $f5])
- +- Exchange(distribution=[hash[a, $f2]])
- +- Calc(select=[a, b, $f2, =($e, 0) AS $g_0, =($e, 1) AS $g_1])
- +- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $e=[0]}, {a=[$0], b=[$1], $f2=[null], $e=[1]}])
- +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2])
- +- Calc(select=[a, b])
- +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+Calc(select=[a, $f1, $f2, CAST(/($f3, $f4)) AS $f3])
++- GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($f2_0) AS $f1, SUM_RETRACT($f3) AS $f2, $SUM0_RETRACT($f4) AS $f3, $SUM0_RETRACT($f5) AS $f4])
+ +- Exchange(distribution=[hash[a]])
+ +- GroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(DISTINCT b) FILTER $g_0 AS $f2_0, SUM(b) FILTER $g_1 AS $f3, $SUM0(b) FILTER $g_1 AS $f4, COUNT(b) FILTER $g_1 AS $f5])
+ +- Exchange(distribution=[hash[a, $f2]])
+ +- Calc(select=[a, b, $f2, =($e, 0) AS $g_0, =($e, 1) AS $g_1])
+ +- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $e=[0]}, {a=[$0], b=[$1], $f2=[null], $e=[1]}])
+ +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2])
+ +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -1031,19 +1019,17 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[SUM($1)], EXP
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a, $f1, $f2, CAST($f3) AS $f3])
-+- Calc(select=[a, $f1, $f2, /($f3, $f4) AS $f3])
- +- GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT(sum$0) AS $f1, SUM_RETRACT((sum$1, count$2)) AS $f2, $SUM0_RETRACT(sum$3) AS $f3, $SUM0_RETRACT(sum$4) AS $f4])
- +- Exchange(distribution=[hash[a]])
- +- LocalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($f2_0) AS sum$0, SUM_RETRACT($f3) AS (sum$1, count$2), $SUM0_RETRACT($f4) AS sum$3, $SUM0_RETRACT($f5) AS sum$4, COUNT_RETRACT(*) AS count1$5])
- +- GlobalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 count$0) AS $f2_0, SUM(sum$1) AS $f3, $SUM0(sum$2) AS $f4, COUNT(count$3) AS $f5])
- +- Exchange(distribution=[hash[a, $f2]])
- +- LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 b) FILTER $g_0 AS count$0, SUM(b) FILTER $g_1 AS sum$1, $SUM0(b) FILTER $g_1 AS sum$2, COUNT(b) FILTER $g_1 AS count$3, DISTINCT(b) AS distinct$0])
- +- Calc(select=[a, b, $f2, =($e, 0) AS $g_0, =($e, 1) AS $g_1])
- +- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $e=[0]}, {a=[$0], b=[$1], $f2=[null], $e=[1]}])
- +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2])
- +- Calc(select=[a, b])
- +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+Calc(select=[a, $f1, $f2, CAST(/($f3, $f4)) AS $f3])
++- GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT(sum$0) AS $f1, SUM_RETRACT((sum$1, count$2)) AS $f2, $SUM0_RETRACT(sum$3) AS $f3, $SUM0_RETRACT(sum$4) AS $f4])
+ +- Exchange(distribution=[hash[a]])
+ +- LocalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($f2_0) AS sum$0, SUM_RETRACT($f3) AS (sum$1, count$2), $SUM0_RETRACT($f4) AS sum$3, $SUM0_RETRACT($f5) AS sum$4, COUNT_RETRACT(*) AS count1$5])
+ +- GlobalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 count$0) AS $f2_0, SUM(sum$1) AS $f3, $SUM0(sum$2) AS $f4, COUNT(count$3) AS $f5])
+ +- Exchange(distribution=[hash[a, $f2]])
+ +- LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 b) FILTER $g_0 AS count$0, SUM(b) FILTER $g_1 AS sum$1, $SUM0(b) FILTER $g_1 AS sum$2, COUNT(b) FILTER $g_1 AS count$3, DISTINCT(b) AS distinct$0])
+ +- Calc(select=[a, b, $f2, =($e, 0) AS $g_0, =($e, 1) AS $g_1])
+ +- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $e=[0]}, {a=[$0], b=[$1], $f2=[null], $e=[1]}])
+ +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2])
+ +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -1106,8 +1092,7 @@ GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($
+- GroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(DISTINCT c) AS $f2_0])
+- Exchange(distribution=[hash[a, $f2]])
+- Calc(select=[a, c, MOD(HASH_CODE(c), 1024) AS $f2])
- +- Calc(select=[a, c])
- +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -1131,8 +1116,7 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RET
+- Exchange(distribution=[hash[a, $f2]])
+- LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 c) AS count$0, DISTINCT(c) AS distinct$0])
+- Calc(select=[a, c, MOD(HASH_CODE(c), 1024) AS $f2])
- +- Calc(select=[a, c])
- +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -1228,11 +1212,10 @@ GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RETRACT($
+- Exchange(distribution=[hash[a, $f3]], updateAsRetraction=[true], accMode=[AccRetract])
+- Calc(select=[a, b, $f2, $f3, =($e, 0) AS $g_0, =($e, 1) AS $g_1], updateAsRetraction=[true], accMode=[AccRetract])
+- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $f3=[$3], $e=[0]}, {a=[$0], b=[$1], $f2=[$2], $f3=[null], $e=[1]}], updateAsRetraction=[true], accMode=[AccRetract])
- +- Calc(select=[a, b, $f2, MOD(HASH_CODE(b), 1024) AS $f3], updateAsRetraction=[true], accMode=[AccRetract])
- +- Calc(select=[a, b, 1 AS $f2], updateAsRetraction=[true], accMode=[AccRetract])
- +- GroupAggregate(groupBy=[c], select=[c, AVG(a) AS a, AVG(b) AS b], updateAsRetraction=[true], accMode=[AccRetract])
- +- Exchange(distribution=[hash[c]], updateAsRetraction=[true], accMode=[Acc])
- +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
+ +- Calc(select=[a, b, 1 AS $f2, MOD(HASH_CODE(b), 1024) AS $f3], updateAsRetraction=[true], accMode=[AccRetract])
+ +- GroupAggregate(groupBy=[c], select=[c, AVG(a) AS a, AVG(b) AS b], updateAsRetraction=[true], accMode=[AccRetract])
+ +- Exchange(distribution=[hash[c]], updateAsRetraction=[true], accMode=[Acc])
+ +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
]]>
</Resource>
</TestCase>
@@ -1266,12 +1249,11 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RET
+- LocalGroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, COUNT_RETRACT(distinct$0 b) FILTER $g_0 AS count$0, COUNT_RETRACT(*) FILTER $g_1 AS count1$1, COUNT_RETRACT(*) AS count1$2, DISTINCT(b) AS distinct$0], updateAsRetraction=[true], accMode=[Acc])
+- Calc(select=[a, b, $f2, $f3, =($e, 0) AS $g_0, =($e, 1) AS $g_1], updateAsRetraction=[true], accMode=[AccRetract])
+- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $f3=[$3], $e=[0]}, {a=[$0], b=[$1], $f2=[$2], $f3=[null], $e=[1]}], updateAsRetraction=[true], accMode=[AccRetract])
- +- Calc(select=[a, b, $f2, MOD(HASH_CODE(b), 1024) AS $f3], updateAsRetraction=[true], accMode=[AccRetract])
- +- Calc(select=[a, b, 1 AS $f2], updateAsRetraction=[true], accMode=[AccRetract])
- +- GlobalGroupAggregate(groupBy=[c], select=[c, AVG((sum$0, count$1)) AS a, AVG((sum$2, count$3)) AS b], updateAsRetraction=[true], accMode=[AccRetract])
- +- Exchange(distribution=[hash[c]], updateAsRetraction=[true], accMode=[Acc])
- +- LocalGroupAggregate(groupBy=[c], select=[c, AVG(a) AS (sum$0, count$1), AVG(b) AS (sum$2, count$3)], updateAsRetraction=[true], accMode=[Acc])
- +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
+ +- Calc(select=[a, b, 1 AS $f2, MOD(HASH_CODE(b), 1024) AS $f3], updateAsRetraction=[true], accMode=[AccRetract])
+ +- GlobalGroupAggregate(groupBy=[c], select=[c, AVG((sum$0, count$1)) AS a, AVG((sum$2, count$3)) AS b], updateAsRetraction=[true], accMode=[AccRetract])
+ +- Exchange(distribution=[hash[c]], updateAsRetraction=[true], accMode=[Acc])
+ +- LocalGroupAggregate(groupBy=[c], select=[c, AVG(a) AS (sum$0, count$1), AVG(b) AS (sum$2, count$3)], updateAsRetraction=[true], accMode=[Acc])
+ +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/GroupingSetsTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/GroupingSetsTest.xml
index 379e14d..0d6d340 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/GroupingSetsTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/GroupingSetsTest.xml
@@ -700,9 +700,9 @@ LogicalProject(c=[$0])
<Resource name="planAfter">
<![CDATA[
Calc(select=[c])
-+- Sort(orderBy=[$f2 ASC, c ASC])
++- Sort(orderBy=[EXPR$1 ASC, c ASC])
+- Exchange(distribution=[single])
- +- Calc(select=[deptno, c, CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT) AS $f2])
+ +- Calc(select=[c, CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT) AS EXPR$1])
+- GroupAggregate(groupBy=[deptno, $e], select=[deptno, $e, COUNT(*) AS c])
+- Exchange(distribution=[hash[deptno, $e]])
+- Expand(projects=[{deptno=[$0], $e=[0]}, {deptno=[null], $e=[1]}])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/IncrementalAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/IncrementalAggregateTest.xml
index 2bb5deb..5b4a016 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/IncrementalAggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/agg/IncrementalAggregateTest.xml
@@ -44,9 +44,8 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0(cou
+- LocalGroupAggregate(groupBy=[a, $f4], partialFinalType=[PARTIAL], select=[a, $f4, COUNT(distinct$0 b) FILTER $g_0 AS count$0, SUM(b) FILTER $g_1 AS sum$1, SUM(b) FILTER $g_10 AS sum$2, DISTINCT(b) AS distinct$0])
+- Calc(select=[a, b, $f2, $f3, $f4, AND(=($e, 0), $f2) AS $g_0, AND(=($e, 1), $f3) AS $g_1, AND(=($e, 1), $f2) AS $g_10])
+- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $f3=[$3], $f4=[$4], $e=[0]}, {a=[$0], b=[$1], $f2=[$2], $f3=[$3], $f4=[null], $e=[1]}])
- +- Calc(select=[a, b, $f2, $f3, MOD(HASH_CODE(b), 1024) AS $f4])
- +- Calc(select=[a, b, IS TRUE(<>(b, 2:BIGINT)) AS $f2, IS TRUE(<>(b, 5:BIGINT)) AS $f3])
- +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- Calc(select=[a, b, IS TRUE(<>(b, 2:BIGINT)) AS $f2, IS TRUE(<>(b, 5:BIGINT)) AS $f3, MOD(HASH_CODE(b), 1024) AS $f4])
+ +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -79,15 +78,13 @@ GroupAggregate(groupBy=[b], partialFinalType=[FINAL], select=[b, FIRST_VALUE_RET
+- GroupAggregate(groupBy=[b, $f2], partialFinalType=[PARTIAL], select=[b, $f2, FIRST_VALUE_RETRACT(c) AS $f2_0, LAST_VALUE_RETRACT(c) AS $f3, COUNT_RETRACT(DISTINCT c) AS $f4], updateAsRetraction=[true], accMode=[AccRetract])
+- Exchange(distribution=[hash[b, $f2]], updateAsRetraction=[true], accMode=[AccRetract])
+- Calc(select=[b, c, MOD(HASH_CODE(c), 1024) AS $f2], updateAsRetraction=[true], accMode=[AccRetract])
- +- Calc(select=[b, c], updateAsRetraction=[true], accMode=[AccRetract])
- +- GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0(count$0) AS $f1, MAX(max$1) AS $f2], updateAsRetraction=[true], accMode=[AccRetract])
- +- Exchange(distribution=[hash[a]], updateAsRetraction=[true], accMode=[Acc])
- +- IncrementalGroupAggregate(partialAggGrouping=[a, $f2], finalAggGrouping=[a], select=[a, COUNT(distinct$0 count$0) AS count$0, MAX(max$1) AS max$1], updateAsRetraction=[true], accMode=[Acc])
- +- Exchange(distribution=[hash[a, $f2]], updateAsRetraction=[true], accMode=[Acc])
- +- LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 b) AS count$0, MAX(b) AS max$1, DISTINCT(b) AS distinct$0], updateAsRetraction=[true], accMode=[Acc])
- +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2], updateAsRetraction=[true], accMode=[Acc])
- +- Calc(select=[a, b], updateAsRetraction=[true], accMode=[Acc])
- +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
+ +- GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0(count$0) AS $f1, MAX(max$1) AS $f2], updateAsRetraction=[true], accMode=[AccRetract])
+ +- Exchange(distribution=[hash[a]], updateAsRetraction=[true], accMode=[Acc])
+ +- IncrementalGroupAggregate(partialAggGrouping=[a, $f2], finalAggGrouping=[a], select=[a, COUNT(distinct$0 count$0) AS count$0, MAX(max$1) AS max$1], updateAsRetraction=[true], accMode=[Acc])
+ +- Exchange(distribution=[hash[a, $f2]], updateAsRetraction=[true], accMode=[Acc])
+ +- LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 b) AS count$0, MAX(b) AS max$1, DISTINCT(b) AS distinct$0], updateAsRetraction=[true], accMode=[Acc])
+ +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2], updateAsRetraction=[true], accMode=[Acc])
+ +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
]]>
</Resource>
</TestCase>
@@ -150,8 +147,7 @@ GlobalGroupAggregate(partialFinalType=[FINAL], select=[$SUM0(count$0) AS $f0, SU
+- Calc(select=[a, b, $f2, $f3, =($e, 1) AS $g_1, =($e, 2) AS $g_2])
+- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $f3=[null], $e=[1]}, {a=[$0], b=[$1], $f2=[null], $f3=[$3], $e=[2]}])
+- Calc(select=[a, b, MOD(HASH_CODE(a), 1024) AS $f2, MOD(HASH_CODE(b), 1024) AS $f3])
- +- Calc(select=[a, b])
- +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -237,8 +233,7 @@ GlobalGroupAggregate(partialFinalType=[FINAL], select=[$SUM0(count$0) AS $f0])
+- Exchange(distribution=[hash[$f1]])
+- LocalGroupAggregate(groupBy=[$f1], partialFinalType=[PARTIAL], select=[$f1, COUNT(distinct$0 c) AS count$0, DISTINCT(c) AS distinct$0])
+- Calc(select=[c, MOD(HASH_CODE(c), 1024) AS $f1])
- +- Calc(select=[c])
- +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -285,18 +280,16 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[SUM($1)], EXP
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[a, $f1, $f2, CAST($f3) AS $f3])
-+- Calc(select=[a, $f1, $f2, /($f3, $f4) AS $f3])
- +- GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0(count$0) AS $f1, SUM(sum$1) AS $f2, $SUM0(sum$2) AS $f3, $SUM0(count$3) AS $f4])
- +- Exchange(distribution=[hash[a]])
- +- IncrementalGroupAggregate(partialAggGrouping=[a, $f2], finalAggGrouping=[a], select=[a, COUNT(distinct$0 count$0) AS count$0, SUM(sum$1) AS sum$1, $SUM0(sum$2) AS sum$2, COUNT(count$3) AS count$3])
- +- Exchange(distribution=[hash[a, $f2]])
- +- LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 b) FILTER $g_0 AS count$0, SUM(b) FILTER $g_1 AS sum$1, $SUM0(b) FILTER $g_1 AS sum$2, COUNT(b) FILTER $g_1 AS count$3, DISTINCT(b) AS distinct$0])
- +- Calc(select=[a, b, $f2, =($e, 0) AS $g_0, =($e, 1) AS $g_1])
- +- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $e=[0]}, {a=[$0], b=[$1], $f2=[null], $e=[1]}])
- +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2])
- +- Calc(select=[a, b])
- +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+Calc(select=[a, $f1, $f2, CAST(/($f3, $f4)) AS $f3])
++- GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0(count$0) AS $f1, SUM(sum$1) AS $f2, $SUM0(sum$2) AS $f3, $SUM0(count$3) AS $f4])
+ +- Exchange(distribution=[hash[a]])
+ +- IncrementalGroupAggregate(partialAggGrouping=[a, $f2], finalAggGrouping=[a], select=[a, COUNT(distinct$0 count$0) AS count$0, SUM(sum$1) AS sum$1, $SUM0(sum$2) AS sum$2, COUNT(count$3) AS count$3])
+ +- Exchange(distribution=[hash[a, $f2]])
+ +- LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 b) FILTER $g_0 AS count$0, SUM(b) FILTER $g_1 AS sum$1, $SUM0(b) FILTER $g_1 AS sum$2, COUNT(b) FILTER $g_1 AS count$3, DISTINCT(b) AS distinct$0])
+ +- Calc(select=[a, b, $f2, =($e, 0) AS $g_0, =($e, 1) AS $g_1])
+ +- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $e=[0]}, {a=[$0], b=[$1], $f2=[null], $e=[1]}])
+ +- Calc(select=[a, b, MOD(HASH_CODE(b), 1024) AS $f2])
+ +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -319,8 +312,7 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0(cou
+- Exchange(distribution=[hash[a, $f2]])
+- LocalGroupAggregate(groupBy=[a, $f2], partialFinalType=[PARTIAL], select=[a, $f2, COUNT(distinct$0 c) AS count$0, DISTINCT(c) AS distinct$0])
+- Calc(select=[a, c, MOD(HASH_CODE(c), 1024) AS $f2])
- +- Calc(select=[a, c])
- +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
@@ -353,12 +345,11 @@ GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, $SUM0_RET
+- LocalGroupAggregate(groupBy=[a, $f3], partialFinalType=[PARTIAL], select=[a, $f3, COUNT_RETRACT(distinct$0 b) FILTER $g_0 AS count$0, COUNT_RETRACT(*) FILTER $g_1 AS count1$1, COUNT_RETRACT(*) AS count1$2, DISTINCT(b) AS distinct$0], updateAsRetraction=[true], accMode=[Acc])
+- Calc(select=[a, b, $f2, $f3, =($e, 0) AS $g_0, =($e, 1) AS $g_1], updateAsRetraction=[true], accMode=[AccRetract])
+- Expand(projects=[{a=[$0], b=[$1], $f2=[$2], $f3=[$3], $e=[0]}, {a=[$0], b=[$1], $f2=[$2], $f3=[null], $e=[1]}], updateAsRetraction=[true], accMode=[AccRetract])
- +- Calc(select=[a, b, $f2, MOD(HASH_CODE(b), 1024) AS $f3], updateAsRetraction=[true], accMode=[AccRetract])
- +- Calc(select=[a, b, 1 AS $f2], updateAsRetraction=[true], accMode=[AccRetract])
- +- GlobalGroupAggregate(groupBy=[c], select=[c, AVG((sum$0, count$1)) AS a, AVG((sum$2, count$3)) AS b], updateAsRetraction=[true], accMode=[AccRetract])
- +- Exchange(distribution=[hash[c]], updateAsRetraction=[true], accMode=[Acc])
- +- LocalGroupAggregate(groupBy=[c], select=[c, AVG(a) AS (sum$0, count$1), AVG(b) AS (sum$2, count$3)], updateAsRetraction=[true], accMode=[Acc])
- +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
+ +- Calc(select=[a, b, 1 AS $f2, MOD(HASH_CODE(b), 1024) AS $f3], updateAsRetraction=[true], accMode=[AccRetract])
+ +- GlobalGroupAggregate(groupBy=[c], select=[c, AVG((sum$0, count$1)) AS a, AVG((sum$2, count$3)) AS b], updateAsRetraction=[true], accMode=[AccRetract])
+ +- Exchange(distribution=[hash[c]], updateAsRetraction=[true], accMode=[Acc])
+ +- LocalGroupAggregate(groupBy=[c], select=[c, AVG(a) AS (sum$0, count$1), AVG(b) AS (sum$2, count$3)], updateAsRetraction=[true], accMode=[Acc])
+ +- TableSourceScan(table=[[MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], updateAsRetraction=[true], accMode=[Acc])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/ConvertToNotInOrInRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/ConvertToNotInOrInRuleTest.scala
new file mode 100644
index 0000000..858e793
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/ConvertToNotInOrInRuleTest.scala
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.calcite.CalciteConfig
+import org.apache.flink.table.plan.optimize.program._
+import org.apache.flink.table.util.TableTestBase
+
+import org.apache.calcite.plan.hep.HepMatchOrder
+import org.apache.calcite.tools.RuleSets
+import org.junit.{Before, Test}
+
+/**
+ * Test for [[ConvertToNotInOrInRule]].
+ */
+class ConvertToNotInOrInRuleTest extends TableTestBase {
+ private val util = batchTestUtil()
+
+ @Before
+ def setup(): Unit = {
+ val programs = new FlinkChainedProgram[BatchOptimizeContext]()
+ programs.addLast(
+ "rules",
+ FlinkHepRuleSetProgramBuilder.newBuilder
+ .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
+ .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+ .add(RuleSets.ofList(ConvertToNotInOrInRule.INSTANCE))
+ .build())
+ val calciteConfig = CalciteConfig.createBuilder(util.tableEnv.getConfig.getCalciteConfig)
+ .replaceBatchProgram(programs).build()
+ util.tableEnv.getConfig.setCalciteConfig(calciteConfig)
+
+ util.addTableSource[(Int, Long, Float, Double, String)]("MyTable", 'a, 'b, 'c, 'd, 'e)
+ }
+
+ @Test
+ def testConvertToIn_LessThanThreshold_Int(): Unit = {
+ util.verifyPlan("SELECT * FROM MyTable WHERE a = 1 OR a = 2 OR a = 3")
+ }
+
+ @Test
+ def testConvertToIn_EqualsToThreshold_Int(): Unit = {
+ util.verifyPlan("SELECT * FROM MyTable WHERE a = 1 OR a = 2 OR a = 3 OR a = 4")
+ }
+
+ @Test
+ def testConvertToIn_GreaterThanThreshold_Int(): Unit = {
+ util.verifyPlan("SELECT * FROM MyTable WHERE a = 1 OR a = 2 OR a = 3 OR a = 4 OR a = 5")
+ }
+
+ @Test
+ def testConvertToIn_LessThanThreshold_Double(): Unit = {
+ val where = (1 until 20).map(i => s"d = $i").mkString(" OR ")
+ util.verifyPlan(s"SELECT * FROM MyTable WHERE $where")
+ }
+
+ @Test
+ def testConvertToIn_GreaterThanThreshold_Double(): Unit = {
+ val where = (1 until 21).map(i => s"d = $i").mkString(" OR ")
+ util.verifyPlan(s"SELECT * FROM MyTable WHERE $where")
+ }
+
+ @Test
+ def testConvertToIn_WithOr1(): Unit = {
+ util.verifyPlan("SELECT * FROM MyTable WHERE a = 1 OR a = 2 OR a = 3 OR a = 4 OR b = 1")
+ }
+
+ @Test
+ def testConvertToIn_WithOr2(): Unit = {
+ util.verifyPlan("SELECT * FROM MyTable WHERE a = 1 OR a = 2 OR b = 1 OR a = 3 OR a = 4")
+ }
+
+ @Test
+ def testConvertToIn_WithAnd1(): Unit = {
+ util.verifyPlan("SELECT * FROM MyTable WHERE (a = 1 OR a = 2 OR a = 3 OR a = 4) AND b = 1")
+ }
+
+ @Test
+ def testConvertToIn_WithAnd2(): Unit = {
+ util.verifyPlan("SELECT * FROM MyTable WHERE a = 1 OR a = 2 OR a = 3 OR a = 4 AND b = 1")
+ }
+
+ @Test
+ def testConvertToNotIn_LessThanThreshold_Int(): Unit = {
+ util.verifyPlan("SELECT * FROM MyTable WHERE a <> 1 AND a <> 2 AND a <> 3")
+ }
+
+ @Test
+ def testConvertToNotIn_EqualsToThreshold_Int(): Unit = {
+ util.verifyPlan("SELECT * FROM MyTable WHERE a <> 1 AND a <> 2 AND a <> 3 AND a <> 4")
+ }
+
+ @Test
+ def testConvertToNotIn_GreaterThanThreshold_Int(): Unit = {
+ util.verifyPlan("SELECT * FROM MyTable WHERE a <> 1 AND a <> 2 AND a <> 3 AND a <> 4 AND a = 5")
+ }
+
+ @Test
+ def testConvertToNotIn_LessThanThreshold_Double(): Unit = {
+ val where = (1 until 20).map(i => s"d <> $i").mkString(" AND ")
+ util.verifyPlan(s"SELECT * FROM MyTable WHERE $where")
+ }
+
+ @Test
+ def testConvertToNotIn_GreaterThanThreshold_Double(): Unit = {
+ val where = (1 until 21).map(i => s"d <> $i").mkString(" AND ")
+ util.verifyPlan(s"SELECT * FROM MyTable WHERE $where")
+ }
+
+ @Test
+ def testConvertToNotIn_WithOr1(): Unit = {
+ util.verifyPlan("SELECT * FROM MyTable WHERE (a <> 1 AND a <> 2 AND a <> 3 AND a <> 4) OR b =" +
+ " 1")
+ }
+
+ @Test
+ def testConvertToNotIn_WithOr2(): Unit = {
+ util.verifyPlan("SELECT * FROM MyTable WHERE a <> 1 AND a <> 2 AND a <> 3 AND a <> 4 OR b = 1")
+ }
+
+ @Test
+ def testConvertToNotIn_WithOr3(): Unit = {
+ util.verifyPlan("SELECT * FROM MyTable WHERE a <> 1 OR a <> 2 OR a <> 3 OR a <> 4 OR b = 1")
+ }
+
+ @Test
+ def testConvertToNotIn_WithAnd1(): Unit = {
+ util.verifyPlan("SELECT * FROM MyTable WHERE a <> 1 AND a <> 2 AND a <> 3 AND a <> 4 AND b = 1")
+ }
+
+ @Test
+ def testConvertToNotIn_WithAnd2(): Unit = {
+ val sqlQuery = "SELECT * FROM MyTable WHERE a <> 1 AND a <> 2 AND b = 1 AND a <> 3 AND a <> 4"
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testConvertToInAndNotIn1(): Unit = {
+ util.verifyPlan("SELECT * FROM MyTable WHERE a = 1 OR a = 2 OR a = 3 OR a = 4 OR b = 1 " +
+ "OR (a <> 1 AND a <> 2 AND a <> 3 AND a <> 4)")
+ }
+
+ @Test
+ def testConvertToInAndNotIn2(): Unit = {
+ util.verifyPlan("SELECT * FROM MyTable WHERE b = 1 OR a = 1 OR a = 2 OR a = 3 OR a = 4 " +
+ "AND (a <> 1 AND a <> 2 AND a <> 3 AND a <> 4)")
+ }
+
+ @Test
+ def testConvertToInAndNotIn3(): Unit = {
+ util.verifyPlan("SELECT * FROM MyTable WHERE b = 1 OR b = 2 OR (a <> 1 AND a <> 2 AND a <> 3 " +
+ "AND a <> 4 AND c = 1) OR b = 3 OR b = 4 OR c = 1")
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkCalcMergeRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkCalcMergeRuleTest.scala
new file mode 100644
index 0000000..15dd6ef
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkCalcMergeRuleTest.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.calcite.CalciteConfig
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalTableSourceScan}
+import org.apache.flink.table.plan.optimize.program._
+import org.apache.flink.table.plan.rules.FlinkBatchRuleSets
+import org.apache.flink.table.runtime.utils.JavaUserDefinedScalarFunctions.NonDeterministicUdf
+import org.apache.flink.table.util.TableTestBase
+
+import org.apache.calcite.plan.hep.HepMatchOrder
+import org.apache.calcite.rel.rules._
+import org.apache.calcite.tools.RuleSets
+import org.junit.{Before, Test}
+
+/**
+ * Test for [[FlinkCalcMergeRule]].
+ */
+class FlinkCalcMergeRuleTest extends TableTestBase {
+ private val util = batchTestUtil()
+
+ @Before
+ def setup(): Unit = {
+ val programs = new FlinkChainedProgram[BatchOptimizeContext]()
+ programs.addLast(
+ "table_ref",
+ FlinkHepRuleSetProgramBuilder.newBuilder
+ .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
+ .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+ .add(FlinkBatchRuleSets.TABLE_REF_RULES)
+ .build())
+ programs.addLast(
+ "logical",
+ FlinkVolcanoProgramBuilder.newBuilder
+ .add(RuleSets.ofList(
+ FilterToCalcRule.INSTANCE,
+ ProjectToCalcRule.INSTANCE,
+ FlinkCalcMergeRule.INSTANCE,
+ FlinkLogicalCalc.CONVERTER,
+ FlinkLogicalTableSourceScan.CONVERTER
+ ))
+ .setRequiredOutputTraits(Array(FlinkConventions.LOGICAL))
+ .build())
+ val calciteConfig = CalciteConfig.createBuilder(util.tableEnv.getConfig.getCalciteConfig)
+ .replaceBatchProgram(programs).build()
+ util.tableEnv.getConfig.setCalciteConfig(calciteConfig)
+
+ util.addTableSource[(Int, Int, String)]("MyTable", 'a, 'b, 'c)
+ }
+
+ @Test
+ def testCalcMergeWithSameDigest(): Unit = {
+ util.verifyPlan("SELECT a, b FROM (SELECT * FROM MyTable WHERE a = b) t WHERE b = a")
+ }
+
+ @Test
+ def testCalcMergeWithNonDeterministicExpr1(): Unit = {
+ util.addFunction("random_udf", new NonDeterministicUdf)
+ val sqlQuery = "SELECT a, a1 FROM (SELECT a, random_udf(a) AS a1 FROM MyTable) t WHERE a1 > 10"
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testCalcMergeWithNonDeterministicExpr2(): Unit = {
+ util.addFunction("random_udf", new NonDeterministicUdf)
+ val sqlQuery = "SELECT a FROM (SELECT a FROM MyTable) t WHERE random_udf(a) > 10"
+ util.verifyPlan(sqlQuery)
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/RewriteCoalesceRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/RewriteCoalesceRuleTest.scala
new file mode 100644
index 0000000..7d7744b
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/RewriteCoalesceRuleTest.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.calcite.CalciteConfig
+import org.apache.flink.table.codegen.CodeGenException
+import org.apache.flink.table.plan.optimize.program.{BatchOptimizeContext, FlinkChainedProgram, FlinkHepRuleSetProgramBuilder, HEP_RULES_EXECUTION_TYPE}
+import org.apache.flink.table.plan.rules.FlinkBatchRuleSets
+import org.apache.flink.table.util.TableTestBase
+
+import org.apache.calcite.plan.hep.HepMatchOrder
+import org.junit.{Before, Test}
+
+import java.sql.Date
+
+/**
+ * Test for [[RewriteCoalesceRule]].
+ */
+class RewriteCoalesceRuleTest extends TableTestBase {
+ private val util = batchTestUtil()
+
+ @Before
+ def setup(): Unit = {
+ val programs = new FlinkChainedProgram[BatchOptimizeContext]()
+ programs.addLast(
+ "default_rewrite",
+ FlinkHepRuleSetProgramBuilder.newBuilder
+ .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
+ .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+ .add(FlinkBatchRuleSets.DEFAULT_REWRITE_RULES)
+ .build())
+
+ val calciteConfig = CalciteConfig.createBuilder(util.tableEnv.getConfig.getCalciteConfig)
+ .replaceBatchProgram(programs).build()
+ util.tableEnv.getConfig.setCalciteConfig(calciteConfig)
+
+ util.addTableSource[(Int, String, String, Int, Date, Double, Double, Int)](
+ "scott_emp", 'empno, 'ename, 'job, 'mgr, 'hiredate, 'sal, 'comm, 'deptno)
+ util.addTableSource[(Int, String, String)]("scott_dept", 'deptno, 'dname, 'loc)
+ }
+
+ @Test
+ def testCalcite1018(): Unit = {
+ val sqlQuery =
+ """
+ |select * from (select * from scott_emp) e left join (
+ | select * from scott_dept d) using (deptno)
+ | order by empno limit 10
+ """.stripMargin
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testCoalesceConstantReduce(): Unit = {
+ val sqlQuery =
+ """
+ |select * from lateral (select * from scott_emp) as e
+ | join (table scott_dept) using (deptno)
+ | where e.deptno = 10
+ """.stripMargin
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test(expected = classOf[CodeGenException])
+ // TODO remove expected exception after [FLINK-12371] merged
+ def testCalcite864_1(): Unit = {
+ val sqlQuery =
+ """
+ |select *
+ | from scott_emp as e
+ | join scott_dept as d using (deptno)
+ | where sal = (
+ | select max(sal)
+ | from scott_emp as e2
+ | join scott_dept as d2 using (deptno)
+ | where d2.deptno = d.deptno)
+ """.stripMargin
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test(expected = classOf[CodeGenException])
+ // TODO remove expected exception after [FLINK-12371] merged
+ def testCalcite864_3(): Unit = {
+ val sqlQuery =
+ """
+ |select *
+ | from scott_emp as e
+ | join scott_dept as d using (deptno)
+ | where d.dname = (
+ | select max(dname)
+ | from scott_dept as d2
+ | where d2.deptno = d.deptno)
+ """.stripMargin
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testNaturalJoinWithPredicates(): Unit = {
+ val sqlQuery =
+ """
+ |select * from scott_dept natural join scott_emp where empno = 1
+ """.stripMargin
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testNaturalJoinLeftOuter(): Unit = {
+ val sqlQuery =
+ """
+ |SELECT * FROM scott_dept
+ | natural left join scott_emp
+ | order by scott_dept.deptno, scott_emp.deptno
+ """.stripMargin
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testNaturalJoinRightOuter(): Unit = {
+ val sqlQuery =
+ """
+ |SELECT * FROM scott_dept
+ | natural right join scott_emp
+ | order by scott_dept.deptno, scott_emp.deptno
+ """.stripMargin
+ util.verifyPlan(sqlQuery)
+ }
+}