You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/14 11:39:24 UTC

[GitHub] [flink] godfreyhe commented on a change in pull request #13577: [FLINK-16579][table] Upgrade Calcite version to 1.26 for Flink SQL

godfreyhe commented on a change in pull request #13577:
URL: https://github.com/apache/flink/pull/13577#discussion_r504367568



##########
File path: flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
##########
@@ -188,7 +190,6 @@ public void testDescribeTable() {
 	/**
 	 * Here we override the super method to avoid test error from `describe statement` supported in original calcite.
 	 */
-	@Override

Review comment:
       missed to update?

##########
File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
##########
@@ -372,6 +373,9 @@ object UserDefinedFunctionUtils {
 
       override def getConsistency: Consistency = Consistency.NONE
 
+      override def paramTypes(typeFactory: RelDataTypeFactory): util.List[RelDataType] = null

Review comment:
       does `null` has special meaning ? if not, return empty list to avoid NPE

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/AggSqlFunction.scala
##########
@@ -165,18 +165,28 @@ object AggSqlFunction {
     }
   }
 
-  private[flink] def createOperandTypeChecker(
+  private[flink] def createOperandMetadata(
       name: String,
       aggregateFunction: ImperativeAggregateFunction[_, _],
       externalAccType: DataType)
-    : SqlOperandTypeChecker = {
+    : SqlOperandMetadata = {
 
     val methods = checkAndExtractMethods(aggregateFunction, "accumulate")
 
     /**
       * Operand type checker based on [[AggregateFunction]] given information.
       */
-    new SqlOperandTypeChecker {
+    new SqlOperandMetadata {
+      override def paramNames(): util.List[String] = {
+        // Does not support named parameters.
+        Collections.emptyList()

Review comment:
       nit: `util.Collections`

##########
File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala
##########
@@ -40,12 +44,19 @@ class TableSqlFunction(
     functionImpl: FlinkTableFunctionImpl[_])
   extends SqlUserDefinedTableFunction(
     new SqlIdentifier(name, SqlParserPos.ZERO),
+    SqlKind.OTHER_FUNCTION,
     ReturnTypes.CURSOR,
     createEvalOperandTypeInference(name, tableFunction, typeFactory),
-    createEvalOperandTypeChecker(name, tableFunction),
-    null,
+    createEvalOperandMetadata(name, tableFunction),
     functionImpl) {
 
+  override def getRowTypeInference: SqlReturnTypeInference = new SqlReturnTypeInference {
+    override def inferReturnType(opBinding: SqlOperatorBinding): RelDataType = {
+      // The arguments should never be used.
+      functionImpl.getRowType(opBinding.getTypeFactory, Collections.emptyList())
+    }
+  }

Review comment:
       is this necessary?

##########
File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/CalcTest.scala
##########
@@ -54,12 +54,17 @@ class CalcTest extends TableTestBase {
     val util = batchTestUtil()
     val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
 
-    val resultStr = (1 to 30).map(i => s"$i:BIGINT").mkString(", ")
+    val resultStr = "Sarg[1L:BIGINT, 2L:BIGINT, 3L:BIGINT, 4L:BIGINT, 5L:BIGINT, " +
+        "6L:BIGINT, 7L:BIGINT, 8L:BIGINT, 9L:BIGINT, 10L:BIGINT, 11L:BIGINT, " +
+        "12L:BIGINT, 13L:BIGINT, 14L:BIGINT, 15L:BIGINT, 16L:BIGINT, " +
+        "17L:BIGINT, 18L:BIGINT, 19L:BIGINT, 20L:BIGINT, 21L:BIGINT, " +
+        "22L:BIGINT, 23L:BIGINT, 24L:BIGINT, 25L:BIGINT, 26L:BIGINT, " +
+        "27L:BIGINT, 28L:BIGINT, 29L:BIGINT, 30L:BIGINT]:BIGINT"

Review comment:
       it can be simplified as `(1 to 30).map(i => s"${i}L:BIGINT").mkString("Sarg[", ", ", "]:BIGINT")`

##########
File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/CalcTest.scala
##########
@@ -73,12 +78,23 @@ class CalcTest extends TableTestBase {
     val util = batchTestUtil()
     val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
 
-    val resultStr = (1 to 30).map(i => s"$i:BIGINT").mkString(", ")
+    val resultStr = "Sarg[(-∞..1L:BIGINT), (1L:BIGINT..2L:BIGINT), " +
+        "(2L:BIGINT..3L:BIGINT), (3L:BIGINT..4L:BIGINT), " +
+        "(4L:BIGINT..5L:BIGINT), (5L:BIGINT..6L:BIGINT), " +
+        "(6L:BIGINT..7L:BIGINT), (7L:BIGINT..8L:BIGINT), " +
+        "(8L:BIGINT..9L:BIGINT), (9L:BIGINT..10L:BIGINT), " +
+        "(10L:BIGINT..11L:BIGINT), (11L:BIGINT..12L:BIGINT), (12L:BIGINT..13L:BIGINT), " +
+        "(13L:BIGINT..14L:BIGINT), (14L:BIGINT..15L:BIGINT), (15L:BIGINT..16L:BIGINT), " +
+        "(16L:BIGINT..17L:BIGINT), (17L:BIGINT..18L:BIGINT), (18L:BIGINT..19L:BIGINT), " +
+        "(19L:BIGINT..20L:BIGINT), (20L:BIGINT..21L:BIGINT), (21L:BIGINT..22L:BIGINT), " +
+        "(22L:BIGINT..23L:BIGINT), (23L:BIGINT..24L:BIGINT), (24L:BIGINT..25L:BIGINT), " +
+        "(25L:BIGINT..26L:BIGINT), (26L:BIGINT..27L:BIGINT), (27L:BIGINT..28L:BIGINT), " +
+        "(28L:BIGINT..29L:BIGINT), (29L:BIGINT..30L:BIGINT), (30L:BIGINT..+∞)]:BIGINT"

Review comment:
       can we simplify this result express ? ` I think (1L:BIGINT..2L:BIGINT) ... (29L:BIGINT..30L:BIGINT)` is meaningless, because `b` is bigint type, not double type. 
   
   I find the result of another CalcTest does not contain type (BIGINT), can we also simply it ?

##########
File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
##########
@@ -204,20 +204,16 @@ class TimeIndicatorConversionTest extends TableTestBase {
 
     val result = t.unionAll(t).select('rowtime)
 
-    val expected = binaryNode(
-      "DataStreamUnion",
-      unaryNode(
-        "DataStreamCalc",
+    val expected = unaryNode(
+      "DataStreamCalc",
+      binaryNode(
+        "DataStreamUnion",
         streamTableNode(t),
-        term("select", "rowtime")
-      ),
-      unaryNode(
-        "DataStreamCalc",
         streamTableNode(t),
-        term("select", "rowtime")
+        term("all", "true"),
+        term("union all", "rowtime, long, int")
       ),
-      term("all", "true"),
-      term("union all", "rowtime")
+      term("select", "rowtime")

Review comment:
       the plan is worse than before, because more fields need to shuffle.

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
##########
@@ -0,0 +1,2955 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.sql2rel;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.ImmutableSortedSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.MultimapBuilder;
+import com.google.common.collect.Sets;
+import com.google.common.collect.SortedSetMultimap;
+import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.linq4j.function.Function2;
+import org.apache.calcite.plan.Context;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCostImpl;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.plan.hep.HepPlanner;
+import org.apache.calcite.plan.hep.HepProgram;
+import org.apache.calcite.plan.hep.HepRelVertex;
+import org.apache.calcite.rel.BiRel;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelHomogeneousShuttle;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.Correlate;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.core.Values;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.logical.LogicalCorrelate;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalSnapshot;
+import org.apache.calcite.rel.logical.LogicalTableFunctionScan;
+import org.apache.calcite.rel.metadata.RelMdUtil;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.rules.CoreRules;
+import org.apache.calcite.rel.rules.FilterCorrelateRule;
+import org.apache.calcite.rel.rules.FilterJoinRule;
+import org.apache.calcite.rel.rules.FilterProjectTransposeRule;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexCorrelVariable;
+import org.apache.calcite.rex.RexFieldAccess;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexSubQuery;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.rex.RexVisitorImpl;
+import org.apache.calcite.sql.SqlExplainFormat;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.sql.SqlFunction;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.fun.SqlCountAggFunction;
+import org.apache.calcite.sql.fun.SqlSingleValueAggFunction;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.Holder;
+import org.apache.calcite.util.ImmutableBeans;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Litmus;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.ReflectUtil;
+import org.apache.calcite.util.ReflectiveVisitor;
+import org.apache.calcite.util.Util;
+import org.apache.calcite.util.mapping.Mappings;
+import org.apache.calcite.util.trace.CalciteTrace;
+import org.slf4j.Logger;
+
+import javax.annotation.Nonnull;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/**
+ * Copied to fix CALCITE-4333, should be removed for the next Calcite upgrade.
+ *
+ * <p>Changes: Line 672 ~ Line 682, Line 430 ~ Line 441.

Review comment:
       nit: Line 671 ~ 681

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
##########
@@ -458,19 +458,15 @@ object BuiltInMethods {
   val TRUNCATE_DEC = Types.lookupMethod(classOf[SqlFunctionUtils], "struncate",
     classOf[DecimalData], classOf[Int])
 
-  // TODO: remove if CALCITE-3199 fixed
-  //  https://issues.apache.org/jira/browse/CALCITE-3199
-  val UNIX_DATE_CEIL = Types.lookupMethod(classOf[SqlDateTimeUtils], "unixDateCeil",
+  val UNIX_DATE_CEIL = Types.lookupMethod(classOf[DateTimeUtils], "unixDateCeil",

Review comment:
       please also remove the TODO in SqlDateTimeUtils class around line 1488 since CALCITE-3199 is fixed

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
##########
@@ -70,9 +70,29 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu
   private def getTableUniqueKeys(
       tableSource: TableSource[_],
       relOptTable: RelOptTable): JSet[ImmutableBitSet] = {
-    // TODO get uniqueKeys from TableSchema of TableSource
 
     relOptTable match {
+      case sourceTable: TableSourceTable =>

Review comment:
       is this necessary in this pr or support it in another pr ?
   
   btw, please add related test in `FlinkRelMdUniqueKeysTest`

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/RelShuttles.scala
##########
@@ -20,15 +20,15 @@ package org.apache.flink.table.planner.plan.utils
 import org.apache.flink.table.planner.catalog.QueryOperationCatalogViewTable
 
 import com.google.common.collect.Sets
-import org.apache.calcite.plan.{RelOptUtil, ViewExpanders}
-import org.apache.calcite.rel.core.{TableFunctionScan, TableScan}
+import org.apache.calcite.plan.ViewExpanders
+import org.apache.calcite.rel.core.TableScan
 import org.apache.calcite.rel.logical._
-import org.apache.calcite.rel.{RelNode, RelShuttle, RelShuttleImpl}
+import org.apache.calcite.rel.{RelHomogeneousShuttle, RelNode, RelShuttleImpl}
 import org.apache.calcite.rex.{RexNode, RexShuttle, RexSubQuery}
 
 import scala.collection.JavaConversions._
 
-class DefaultRelShuttle extends RelShuttle {
+class DefaultRelShuttle extends RelHomogeneousShuttle {

Review comment:
       Very useful abstraction ~

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecNestedLoopJoin.scala
##########
@@ -94,7 +94,13 @@ class BatchExecNestedLoopJoin(
       (buildRowSize + BinaryRowDataSerializer.LENGTH_SIZE_IN_BYTES) * shuffleBuildCount(mq)
     val cpuCost = leftRowCnt * rightRowCnt
     val costFactory = planner.getCostFactory.asInstanceOf[FlinkCostFactory]
-    costFactory.makeCost(mq.getRowCount(this), cpuCost, 0, 0, memoryCost)
+    val cost = costFactory.makeCost(mq.getRowCount(this), cpuCost, 0, 0, memoryCost)
+    if (singleRowJoin) {
+      // Make single row join more preferable than non-single row join.
+      cost.multiplyBy(0.99)
+    } else {
+      cost
+    }

Review comment:
       do we meet some corner cases ?

##########
File path: flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkLogicalRankRuleForRangeEndTest.xml
##########
@@ -131,14 +131,14 @@ WHERE rk <= 2 AND rk > -2
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], rk=[$2])
 +- LogicalFilter(condition=[AND(<=($2, 2), >($2, -2))])
-   +- LogicalProject(a=[$0], b=[$1], rk=[RANK() OVER (PARTITION BY $1, $2 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
+   +- LogicalProject(a=[$0], b=[$1], rk=[RANK() OVER (PARTITION BY $1, $2 ORDER BY $0 NULLS FIRST)])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-FlinkLogicalCalc(select=[a, b, w0$o0])
-+- FlinkLogicalRank(rankType=[RANK], rankRange=[rankStart=-1, rankEnd=2], partitionBy=[b,c], orderBy=[a ASC], select=[a, b, c, w0$o0])
+FlinkLogicalCalc(select=[a, b, w0$o0 AS $2], where=[SEARCH(w0$o0, Sarg[(-2..2]])])
++- FlinkLogicalOverAggregate(window#0=[window(partition {1, 2} order by [0 ASC-nulls-first] aggs [RANK()])])

Review comment:
       bad case

##########
File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdPopulationSizeTest.scala
##########
@@ -294,28 +306,28 @@ class FlinkRelMdPopulationSizeTest extends FlinkRelMdHandlerTestBase {
 
     assertEquals(1.0, mq.getPopulationSize(logicalLeftJoinNotOnUniqueKeys, ImmutableBitSet.of()))
     assertEquals(2.0E7, mq.getPopulationSize(logicalLeftJoinNotOnUniqueKeys, ImmutableBitSet.of(0)))
-    assertEquals(505696447.06,
+    assertEquals(5.056964454581646E8,
       mq.getPopulationSize(logicalLeftJoinNotOnUniqueKeys, ImmutableBitSet.of(1)), 1e-2)
-    assertEquals(799999979.15,
+    assertEquals(8.0E8,
       mq.getPopulationSize(logicalLeftJoinNotOnUniqueKeys, ImmutableBitSet.of(1, 5)), 1e-2)
-    assertEquals(793772745.78,
+    assertEquals(7.937719925300186E8,
       mq.getPopulationSize(logicalLeftJoinNotOnUniqueKeys, ImmutableBitSet.of(0, 6)), 1e-2)
 
     assertEquals(1.0,
       mq.getPopulationSize(logicalRightJoinOnLHSUniqueKeys, ImmutableBitSet.of()))
-    assertEquals(12642411.178,
+    assertEquals(1.2642411364806734E7,
       mq.getPopulationSize(logicalRightJoinOnLHSUniqueKeys, ImmutableBitSet.of(0)), 1e-2)
-    assertEquals(19752070.37,
+    assertEquals(1.9752070270976853E7,
       mq.getPopulationSize(logicalRightJoinOnLHSUniqueKeys, ImmutableBitSet.of(1)), 1e-2)
-    assertEquals(19999999.87,
+    assertEquals(2.0E7,
       mq.getPopulationSize(logicalRightJoinOnLHSUniqueKeys, ImmutableBitSet.of(1, 5)), 1e-2)
-    assertEquals(19996088.14,
+    assertEquals(1.9996069026214965E7,
       mq.getPopulationSize(logicalRightJoinOnLHSUniqueKeys, ImmutableBitSet.of(0, 6)), 1e-2)
 
     assertEquals(1.0, mq.getPopulationSize(logicalFullJoinWithoutEquiCond, ImmutableBitSet.of()))
     assertEquals(2.0E7, mq.getPopulationSize(logicalFullJoinWithoutEquiCond, ImmutableBitSet.of(0)))
     assertEquals(8.0E8, mq.getPopulationSize(logicalFullJoinWithoutEquiCond, ImmutableBitSet.of(1)))
-    assertEquals(6.295509444597865E15,
+    assertEquals(8.0E15,

Review comment:
       is this within our expectations?

##########
File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/CorrelateTest.scala
##########
@@ -23,7 +23,7 @@ import org.apache.flink.table.api._
 import org.apache.flink.table.planner.plan.optimize.program.FlinkBatchProgram
 import org.apache.flink.table.planner.utils.{MockPythonTableFunction, TableFunc0, TableFunc1, TableTestBase}
 
-import org.apache.calcite.rel.rules.{CalcMergeRule, FilterCalcMergeRule, ProjectCalcMergeRule}
+import org.apache.calcite.rel.rules.{CalcMergeRule, CoreRules, FilterCalcMergeRule, ProjectCalcMergeRule}

Review comment:
       remove unused imports

##########
File path: flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/SetOperatorsTest.xml
##########
@@ -100,10 +100,9 @@ LogicalProject(b=[$1], c=[$2])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Union(all=[true], union=[b, c])
-:- Calc(select=[b, c])
-:  +- LegacyTableSourceScan(table=[[default_catalog, default_database, left, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-+- Calc(select=[b, c])
+Calc(select=[b, c])

Review comment:
       ditto

##########
File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/CalcTest.scala
##########
@@ -114,11 +114,13 @@ class CalcTest extends TableTestBase {
     val resultTable = sourceTable.select('a, 'b, 'c)
       .where((1 to 30).map($"b" === _).reduce((ex1, ex2) => ex1 || ex2) && ($"c" === "xx"))
 
+    val operands = "Sarg[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, " +

Review comment:
       can `SEARCH(c, Sarg['xx']:CHAR(2)` be simplified as `=(c, 'xx')`

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/calcite/schema/Statistic.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.schema;
+
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelReferentialConstraint;
+import org.apache.calcite.util.ImmutableBitSet;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Statistics about a {@link Table}.
+ *
+ * <p>Each of the methods may return {@code null} meaning "not known".</p>
+ *
+ * <p>Changes:
+ *
+ * <ul>
+ *     <li>Line 61: default collations change from null to empty list.</li>

Review comment:
       legacy planner also has customer statistic class: `FlinkStatistic`

##########
File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoLegacyTableSourceScanRuleTest.scala
##########
@@ -111,6 +111,9 @@ class PushProjectIntoLegacyTableSourceScanRuleTest extends TableTestBase {
 
   @Test
   def testProjectWithoutInputRef(): Unit = {
+    // Regression by: CALCITE-4220,

Review comment:
       we can implement a rule to re-add a projection node for one phase aggregation

##########
File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
##########
@@ -2679,15 +2679,15 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
       "1017-11-29 22:58:58.998")
 
     val QUARTER = Seq(
-      "2018-03-01 22:58:58.998",
-      "2018-08-31 22:58:58.998",
+      "2018-02-28 22:58:58.998",

Review comment:
       Is it within our expectations? what changes (or bug fixs) cause this ?

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala
##########
@@ -343,6 +344,54 @@ object FlinkRexUtil {
       }
     })
 
+  /**
+   * Returns whether a given tree contains any {@link RexInputRef} nodes
+   * with given indices.
+   *
+   * @param node a RexNode tree
+   */
+  def containsInputRef(node: RexNode, refs: JSet[Integer]): Boolean = try {
+    val visitor = new RexVisitorImpl[Void](true) {
+      override def visitInputRef(inputRef: RexInputRef): Void = {
+        if (refs.contains(inputRef.getIndex)) {
+          throw new Util.FoundOne(inputRef)
+        }
+        null
+      }
+    }
+    node.accept(visitor)
+    false
+  } catch {
+    case e: Util.FoundOne =>
+      Util.swallow(e, null)
+      true
+  }

Review comment:
       It seems this method is not been used in any where.

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/TableSqlFunction.scala
##########
@@ -129,20 +138,46 @@ object TableSqlFunction {
         }
   }
 
-  private[flink] def createOperandTypeChecker(
+  private[flink] def createOperandMetadata(
       name: String,
-      udtf: TableFunction[_]): SqlOperandTypeChecker = {
-    new OperandTypeChecker(name, udtf, checkAndExtractMethods(udtf, "eval"))
+      udtf: TableFunction[_]): SqlOperandMetadata = {
+    new OperandMetadata(name, udtf, checkAndExtractMethods(udtf, "eval"))
+  }
+
+  /**
+   * Converts arguments from [[org.apache.calcite.sql.SqlNode]] to
+   * java object format.
+   *
+   * @param callBinding Operator bound to arguments
+   * @param function target function to get parameter types from
+   * @param opName name of the operator to use in error message
+   * @return converted list of arguments
+   */
+  private[flink] def convertArguments(
+      callBinding: SqlOperatorBinding,
+      function: org.apache.calcite.schema.Function,
+      opName: SqlIdentifier): util.List[Object] = {
+    val arguments = new util.ArrayList[Object](callBinding.getOperandCount)
+    0 until callBinding.getOperandCount foreach { i =>
+      val operandType = callBinding.getOperandType(i)

Review comment:
       never use 

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/AggSqlFunction.scala
##########
@@ -165,18 +165,28 @@ object AggSqlFunction {
     }
   }
 
-  private[flink] def createOperandTypeChecker(
+  private[flink] def createOperandMetadata(
       name: String,
       aggregateFunction: ImperativeAggregateFunction[_, _],
       externalAccType: DataType)
-    : SqlOperandTypeChecker = {
+    : SqlOperandMetadata = {
 
     val methods = checkAndExtractMethods(aggregateFunction, "accumulate")
 
     /**
       * Operand type checker based on [[AggregateFunction]] given information.
       */
-    new SqlOperandTypeChecker {
+    new SqlOperandMetadata {
+      override def paramNames(): util.List[String] = {
+        // Does not support named parameters.
+        Collections.emptyList()
+      }
+
+      override def paramTypes(typeFactory: RelDataTypeFactory): util.List[RelDataType] = {
+        // This should be never invoked.
+        null

Review comment:
       return empty list to avoid NPE or throw an exception here directly

##########
File path: flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.xml
##########
@@ -564,16 +562,14 @@ HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0]), rowType
     <Resource name="planBefore">
       <![CDATA[
 LogicalAggregate(group=[{}], EXPR$0=[COUNT()]), rowType=[RecordType(BIGINT EXPR$0)]
-+- LogicalProject($f0=[0]), rowType=[RecordType(INTEGER $f0)]
-   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(byte, short, int, long, float, double, boolean, string, date, time, timestamp, decimal3020, decimal105)]]]), rowType=[RecordType(TINYINT byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, BOOLEAN boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time, TIMESTAMP(3) timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(byte, short, int, long, float, double, boolean, string, date, time, timestamp, decimal3020, decimal105)]]]), rowType=[RecordType(TINYINT byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, BOOLEAN boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time, TIMESTAMP(3) timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 HashAggregate(isMerge=[false], select=[COUNT(*) AS EXPR$0]), rowType=[RecordType(BIGINT EXPR$0)]
-+- Exchange(distribution=[single]), rowType=[RecordType(INTEGER $f0)]
-   +- Calc(select=[0 AS $f0]), rowType=[RecordType(INTEGER $f0)]
-      +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(byte, short, int, long, float, double, boolean, string, date, time, timestamp, decimal3020, decimal105)]]], fields=[byte, short, int, long, float, double, boolean, string, date, time, timestamp, decimal3020, decimal105]), rowType=[RecordType(TINYINT byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, BOOLEAN boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time, TIMESTAMP(3) timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
++- Exchange(distribution=[single]), rowType=[RecordType(TINYINT byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, BOOLEAN boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time, TIMESTAMP(3) timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]

Review comment:
       this is a bad case, more data will be shuffled.

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ColumnIntervalUtil.scala
##########
@@ -198,9 +198,14 @@ object ColumnIntervalUtil {
     */
   def getColumnIntervalWithFilter(
       originInterval: Option[ValueInterval],
-      predicate: RexNode,
+      oriPred: RexNode,

Review comment:
       please also update the param name in java doc

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/TableSqlFunction.scala
##########
@@ -191,4 +226,8 @@ class OperandTypeChecker(
   override def isOptional(i: Int): Boolean = false
 
   override def getConsistency: Consistency = Consistency.NONE
+
+  override def paramTypes(typeFactory: RelDataTypeFactory): util.List[RelDataType] = null

Review comment:
       ditto

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/SelectivityEstimator.scala
##########
@@ -187,8 +188,8 @@ class SelectivityEstimator(rel: RelNode, mq: FlinkRelMetadataQuery)
       case IS_NOT_NULL =>
         estimateIsNotNull(operands.head)
 
-      case IN =>
-        estimateIn(operands.head, operands.slice(1, operands.size))
+      case SEARCH =>

Review comment:
       can we make sure all `IN`s have been converted to `SEARCH` ?

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala
##########
@@ -353,7 +402,7 @@ object FlinkRexUtil {
     */
   private[flink] def adjustInputRef(
       expr: RexNode,
-      fieldsOldToNewIndexMapping: Map[Int, Int]): RexNode = expr.accept(
+      fieldsOldToNewIndexMapping: Map[Int, Int]) = expr.accept(

Review comment:
       revert this ?

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
##########
@@ -768,7 +768,8 @@ public void testRowTableFunction() throws Exception {
 
 		tEnv().createTemporarySystemFunction("RowTableFunction", RowTableFunction.class);
 		tEnv().executeSql(
-				"INSERT INTO SinkTable SELECT t.s, t.sa FROM SourceTable, LATERAL TABLE(RowTableFunction(s)) t")
+				"INSERT INTO SinkTable SELECT t.s, t.sa FROM SourceTable source, "

Review comment:
       `RowTableFunction(s)` does not work now ?

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
##########
@@ -108,15 +108,16 @@ object FlinkStreamRuleSets {
       REDUCE_EXPRESSION_RULES.asScala ++
       List(
         //removes constant keys from an Agg
-        AggregateProjectPullUpConstantsRule.INSTANCE,
+        CoreRules.AGGREGATE_PROJECT_PULL_UP_CONSTANTS,
         // fix: FLINK-17553 unsupported call error when constant exists in group window key
         // this rule will merge the project generated by AggregateProjectPullUpConstantsRule and
         // make sure window aggregate can be correctly rewritten by StreamLogicalWindowAggregateRule
-        ProjectMergeRule.INSTANCE,
+        CoreRules.PROJECT_MERGE,
         StreamLogicalWindowAggregateRule.INSTANCE,
+//        WindowJoinRewriteRule.INSTANCE,

Review comment:
       remove this directly?

##########
File path: flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RankNumberColumnRemoveRuleTest.xml
##########
@@ -84,16 +82,15 @@ WHERE rank_num >= 1 AND rank_num < 2
       <![CDATA[
 LogicalProject(a=[$0])
 +- LogicalFilter(condition=[AND(>=($4, 1), <($4, 2))])
-   +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], rank_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $3 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], rank_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $3 DESC NULLS LAST)])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-FlinkLogicalCalc(select=[a])
-+- FlinkLogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[rowtime DESC], select=[a, rowtime])
-   +- FlinkLogicalCalc(select=[a, rowtime])
-      +- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, MyTable]])
+FlinkLogicalCalc(select=[a], where=[SEARCH(w0$o0, Sarg[[1..2)])])
++- FlinkLogicalOverAggregate(window#0=[window(partition {0} order by [3 DESC-nulls-last] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])

Review comment:
       ditto

##########
File path: flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RankTest.xml
##########
@@ -142,19 +171,17 @@ WHERE rk <= 2 AND rk > -2
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], rk=[$2])
 +- LogicalFilter(condition=[AND(<=($2, 2), >($2, -2))])
-   +- LogicalProject(a=[$0], b=[$1], rk=[RANK() OVER (PARTITION BY $1, $2 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
+   +- LogicalProject(a=[$0], b=[$1], rk=[RANK() OVER (PARTITION BY $1, $2 ORDER BY $0 NULLS FIRST)])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, b, $2])
-+- Rank(rankType=[RANK], rankRange=[rankStart=-1, rankEnd=2], partitionBy=[b, c], orderBy=[a ASC], global=[true], select=[a, b, c, $2])
+Calc(select=[a, b, w0$o0 AS $2], where=[SEARCH(w0$o0, Sarg[(-2..2]])])
++- OverAggregate(partitionBy=[b, c], orderBy=[a ASC], window#0=[RANK(*) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o0])

Review comment:
       bad case!

##########
File path: flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkLogicalRankRuleForConstantRangeTest.xml
##########
@@ -178,14 +178,14 @@ WHERE rk <= 2 AND rk > -2
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], rk=[$2])
 +- LogicalFilter(condition=[AND(<=($2, 2), >($2, -2))])
-   +- LogicalProject(a=[$0], b=[$1], rk=[RANK() OVER (PARTITION BY $1, $2 ORDER BY $0 NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
+   +- LogicalProject(a=[$0], b=[$1], rk=[RANK() OVER (PARTITION BY $1, $2 ORDER BY $0 NULLS FIRST)])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-FlinkLogicalCalc(select=[a, b, $2])
-+- FlinkLogicalRank(rankType=[RANK], rankRange=[rankStart=-1, rankEnd=2], partitionBy=[b,c], orderBy=[a ASC], select=[a, b, c, $2])
+FlinkLogicalCalc(select=[a, b, w0$o0 AS $2], where=[SEARCH(w0$o0, Sarg[(-2..2]])])
++- FlinkLogicalOverAggregate(window#0=[window(partition {1, 2} order by [0 ASC-nulls-first] aggs [RANK()])])

Review comment:
       bad case

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ColumnIntervalUtil.scala
##########
@@ -198,9 +198,14 @@ object ColumnIntervalUtil {
     */
   def getColumnIntervalWithFilter(
       originInterval: Option[ValueInterval],
-      predicate: RexNode,
+      oriPred: RexNode,
       inputRef: Int,
       rexBuilder: RexBuilder): ValueInterval = {
+    val predicate = if (oriPred.getKind == SqlKind.SEARCH) {
+      RexUtil.expandSearch(rexBuilder, null, oriPred)
+    } else {
+      oriPred
+    }

Review comment:
       provide an utility method in FlinkRexUtil,  there is a lot of similar code

##########
File path: flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.xml
##########
@@ -574,16 +572,14 @@ SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0]), rowType
     <Resource name="planBefore">
       <![CDATA[
 LogicalAggregate(group=[{}], EXPR$0=[COUNT()]), rowType=[RecordType(BIGINT EXPR$0)]
-+- LogicalProject($f0=[0]), rowType=[RecordType(INTEGER $f0)]
-   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(byte, short, int, long, float, double, boolean, string, date, time, timestamp, decimal3020, decimal105)]]]), rowType=[RecordType(TINYINT byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, BOOLEAN boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time, TIMESTAMP(3) timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(byte, short, int, long, float, double, boolean, string, date, time, timestamp, decimal3020, decimal105)]]]), rowType=[RecordType(TINYINT byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, BOOLEAN boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time, TIMESTAMP(3) timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 SortAggregate(isMerge=[false], select=[COUNT(*) AS EXPR$0]), rowType=[RecordType(BIGINT EXPR$0)]
-+- Exchange(distribution=[single]), rowType=[RecordType(INTEGER $f0)]
-   +- Calc(select=[0 AS $f0]), rowType=[RecordType(INTEGER $f0)]
-      +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(byte, short, int, long, float, double, boolean, string, date, time, timestamp, decimal3020, decimal105)]]], fields=[byte, short, int, long, float, double, boolean, string, date, time, timestamp, decimal3020, decimal105]), rowType=[RecordType(TINYINT byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, BOOLEAN boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time, TIMESTAMP(3) timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
++- Exchange(distribution=[single]), rowType=[RecordType(TINYINT byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, BOOLEAN boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time, TIMESTAMP(3) timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]

Review comment:
       ditto

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
##########
@@ -539,6 +559,25 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu
     }
   }
 
+  def getUniqueKeys(
+      rel: TableFunctionScan,
+      mq: RelMetadataQuery,
+      ignoreNulls: Boolean): JSet[ImmutableBitSet] = {
+    if (rel.getInputs.size() == 1
+        && rel.getCall.asInstanceOf[RexCall].getOperator.isInstanceOf[SqlWindowTableFunction]) {
+      mq.getUniqueKeys(rel.getInput(0), ignoreNulls)
+    } else {
+      null
+    }
+  }
+
+  def getUniqueKeys(
+      rel: WatermarkAssigner,
+      mq: RelMetadataQuery,
+      ignoreNulls: Boolean): JSet[ImmutableBitSet] = {
+    mq.getUniqueKeys(rel.getInput, ignoreNulls)
+  }

Review comment:
       ditto. 
   nit, please move this close to TableScan part, this could make the code easier to read.

##########
File path: flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/WindowAggregateTest.xml
##########
@@ -699,10 +699,10 @@ LogicalProject(s=[$1], a=[$2], wStart=[TUMBLE_START($0)])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[CAST(CASE(=($f1, 0), null:INTEGER, s)) AS s, CAST(CAST(/(CASE(=($f1, 0), null:INTEGER, s), $f1))) AS a, w$start AS wStart])
-+- HashWindowAggregate(window=[TumblingGroupWindow('w$, b, 900000)], properties=[w$start, w$end, w$rowtime], select=[Final_$SUM0(sum$0) AS s, Final_COUNT(count1$1) AS $f1])
+Calc(select=[CAST(s) AS s, CAST(a) AS a, w$start AS wStart])
++- HashWindowAggregate(window=[TumblingGroupWindow('w$, b, 900000)], properties=[w$start, w$end, w$rowtime], select=[Final_SUM(sum$0) AS s, Final_AVG(sum$1, count$2) AS a])

Review comment:
       `AVG` does not been rewritten as `SUM / COUNT` now ?

##########
File path: flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RankNumberColumnRemoveRuleTest.xml
##########
@@ -111,16 +108,15 @@ WHERE rank_num >= 1 AND rank_num < 2
       <![CDATA[
 LogicalProject(a=[$0], rank_num=[$4])
 +- LogicalFilter(condition=[AND(>=($4, 1), <($4, 2))])
-   +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], rank_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $3 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], rank_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $3 DESC NULLS LAST)])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-FlinkLogicalCalc(select=[a, 1:BIGINT AS w0$o0])
-+- FlinkLogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[rowtime DESC], select=[a, rowtime])
-   +- FlinkLogicalCalc(select=[a, rowtime])
-      +- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, MyTable]])
+FlinkLogicalCalc(select=[a, w0$o0 AS rank_num], where=[SEARCH(w0$o0, Sarg[[1..2)])])
++- FlinkLogicalOverAggregate(window#0=[window(partition {0} order by [3 DESC-nulls-last] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])

Review comment:
       ditto

##########
File path: flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/SemiAntiJoinTest.xml
##########
@@ -516,28 +516,28 @@ LogicalFilter(condition=[<>($cor0.b, $1)])
       <![CDATA[
 Join(joinType=[LeftAntiJoin], where=[<>(b, e)], select=[a, b, c], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
 :- Exchange(distribution=[single])
-:  +- Join(joinType=[LeftSemiJoin], where=[$f0], select=[a, b, c], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
-:     :- Exchange(distribution=[single])
-:     :  +- Join(joinType=[LeftAntiJoin], where=[AND(OR(=(b, i), IS NULL(b), IS NULL(i)), =(c, k))], select=[a, b, c], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
-:     :     :- Exchange(distribution=[hash[c]])
+:  +- Join(joinType=[LeftAntiJoin], where=[AND(OR(IS NULL(b), IS NULL(i), =(b, i)), =(c, k))], select=[a, b, c], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])

Review comment:
       do you know which jira causes this join-order change without join-order rule ?

##########
File path: flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RankNumberColumnRemoveRuleTest.xml
##########
@@ -57,16 +56,15 @@ WHERE rank_num >= 1 AND rank_num < 3
       <![CDATA[
 LogicalProject(a=[$0], rank_num=[$4])
 +- LogicalFilter(condition=[AND(>=($4, 1), <($4, 3))])
-   +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], rank_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $3 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], rank_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $3 DESC NULLS LAST)])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-FlinkLogicalCalc(select=[a, w0$o0])
-+- FlinkLogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=2], partitionBy=[a], orderBy=[rowtime DESC], select=[a, rowtime, w0$o0])
-   +- FlinkLogicalCalc(select=[a, rowtime])
-      +- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, MyTable]])
+FlinkLogicalCalc(select=[a, w0$o0 AS rank_num], where=[SEARCH(w0$o0, Sarg[[1..3)])])
++- FlinkLogicalOverAggregate(window#0=[window(partition {0} order by [3 DESC-nulls-last] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])

Review comment:
       ditto

##########
File path: flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
##########
@@ -68,15 +68,14 @@ Calc(select=[rtime])
     <Resource name="planBefore">
       <![CDATA[
 LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
-+- LogicalProject($f0=[1])
-   +- LogicalTableScan(table=[[default_catalog, default_database, T]])
++- LogicalTableScan(table=[[default_catalog, default_database, T]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 GroupAggregate(select=[COUNT(*) AS EXPR$0])
 +- Exchange(distribution=[single])
-   +- TableSourceScan(table=[[default_catalog, default_database, T, project=[]]], fields=[])
+   +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[id, name])

Review comment:
       bad case

##########
File path: flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RankNumberColumnRemoveRuleTest.xml
##########
@@ -30,16 +30,15 @@ WHERE rank_num >= 1 AND rank_num < 2
       <![CDATA[
 LogicalProject(a=[$0], rank_num=[$4])
 +- LogicalFilter(condition=[AND(>=($4, 1), <($4, 2))])
-   +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], rank_num=[RANK() OVER (PARTITION BY $0 ORDER BY $3 DESC NULLS LAST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], rank_num=[RANK() OVER (PARTITION BY $0 ORDER BY $3 DESC NULLS LAST)])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-FlinkLogicalCalc(select=[a, w0$o0])
-+- FlinkLogicalRank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[rowtime DESC], select=[a, rowtime, w0$o0])
-   +- FlinkLogicalCalc(select=[a, rowtime])
-      +- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, MyTable]])
+FlinkLogicalCalc(select=[a, w0$o0 AS rank_num], where=[SEARCH(w0$o0, Sarg[[1..2)])])
++- FlinkLogicalOverAggregate(window#0=[window(partition {0} order by [3 DESC-nulls-last] aggs [RANK()])])

Review comment:
       bad case

##########
File path: flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
##########
@@ -23,17 +23,16 @@ limitations under the License.
     <Resource name="planBefore">
       <![CDATA[
 LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
-+- LogicalProject($f0=[0])
-   +- LogicalFilter(condition=[>($1, 1)])
-      +- LogicalTableScan(table=[[default_catalog, default_database, src]])
++- LogicalFilter(condition=[>($1, 1)])
+   +- LogicalTableScan(table=[[default_catalog, default_database, src]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 GroupAggregate(select=[COUNT_RETRACT(*) AS EXPR$0], changelogMode=[I,UA,D])
 +- Exchange(distribution=[single], changelogMode=[I,UB,UA])
-   +- Calc(select=[0 AS $f0], where=[>(a, 1)], changelogMode=[I,UB,UA])
-      +- TableSourceScan(table=[[default_catalog, default_database, src, filter=[], project=[a]]], fields=[a], changelogMode=[I,UB,UA])
+   +- Calc(select=[ts, a, b], where=[>(a, 1)], changelogMode=[I,UB,UA])

Review comment:
       bad case, shuffle more data

##########
File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdDistinctRowCountTest.scala
##########
@@ -91,35 +91,43 @@ class FlinkRelMdDistinctRowCountTest extends FlinkRelMdHandlerTestBase {
     assertEquals(1.0, mq.getDistinctRowCount(logicalProject, ImmutableBitSet.of(), null))
     assertEquals(50.0, mq.getDistinctRowCount(logicalProject, ImmutableBitSet.of(0), null))
     assertEquals(48.0, mq.getDistinctRowCount(logicalProject, ImmutableBitSet.of(1), null))
-    assertEquals(16.96, mq.getDistinctRowCount(logicalProject, ImmutableBitSet.of(2), null), 1e-2)
+    assertEquals(17.13976902522821,

Review comment:
       use `assertEquals` with `delta` parameter, we only need to compare two-bit precision.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org