You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/12/27 08:37:53 UTC

[flink] branch master updated: [FLINK-30368][table-planner] Fix calcite method RelMdUtil$numDistinctVals() wrongly return zero if the method input domainSize much larger than numSelected

This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new dc862dae28a [FLINK-30368][table-planner] Fix calcite method RelMdUtil$numDistinctVals() wrongly return zero if the method input domainSize much larger than numSelected
dc862dae28a is described below

commit dc862dae28a172f674a9b8a2198c603275304550
Author: zhengyunhong.zyh <33...@qq.com>
AuthorDate: Mon Dec 12 14:44:59 2022 +0800

    [FLINK-30368][table-planner] Fix calcite method RelMdUtil$numDistinctVals() wrongly return zero if the method input domainSize much larger than numSelected
    
    This closes #21490
---
 .../plan/metadata/FlinkRelMdDistinctRowCount.scala |  2 +-
 .../table/planner/plan/utils/FlinkRelMdUtil.scala  | 81 ++++++++++++++++++++++
 .../planner/plan/utils/FlinkRelMdUtilTest.scala    |  1 +
 3 files changed, 83 insertions(+), 1 deletion(-)

diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdDistinctRowCount.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdDistinctRowCount.scala
index f74b7b0473b..86158e0c85a 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdDistinctRowCount.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdDistinctRowCount.scala
@@ -539,7 +539,7 @@ class FlinkRelMdDistinctRowCount private extends MetadataHandler[BuiltInMetadata
         }
         mq.getDistinctRowCount(rel.getLeft, groupKey, newPred)
       case _ =>
-        RelMdUtil.getJoinDistinctRowCount(mq, rel, rel.getJoinType, groupKey, predicate, false)
+        FlinkRelMdUtil.getJoinDistinctRowCount(mq, rel, rel.getJoinType, groupKey, predicate, false)
     }
   }
 
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelMdUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelMdUtil.scala
index bab6f791433..a2038d0897b 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelMdUtil.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelMdUtil.scala
@@ -37,6 +37,7 @@ import org.apache.calcite.rex._
 import org.apache.calcite.sql.`type`.SqlTypeName.{TIME, TIMESTAMP}
 import org.apache.calcite.sql.SqlKind
 import org.apache.calcite.util.{ImmutableBitSet, NumberUtil}
+import org.apache.calcite.util.NumberUtil.multiply
 
 import java.math.BigDecimal
 import java.util
@@ -247,6 +248,86 @@ object FlinkRelMdUtil {
     RexUtil.composeConjunction(rexBuilder, pushable, true)
   }
 
+  /**
+   * This method is copied from calcite RelMdUtil and line 324 ~ 328 are changed. This method should
+   * be removed once CALCITE-4351 is fixed. See CALCITE-4351 and FLINK-19780.
+   *
+   * Computes the number of distinct rows for a set of keys returned from a join. Also known as NDV
+   * (number of distinct values).
+   *
+   * @param joinRel
+   *   RelNode representing the join
+   * @param joinType
+   *   type of join
+   * @param groupKey
+   *   keys that the distinct row count will be computed for
+   * @param predicate
+   *   join predicate
+   * @param useMaxNdv
+   *   If true use formula <code>max(left NDV, right NDV)</code>, otherwise use <code>left NDV *
+   *   right NDV</code>.
+   * @return
+   *   number of distinct rows
+   */
+  def getJoinDistinctRowCount(
+      mq: RelMetadataQuery,
+      joinRel: RelNode,
+      joinType: JoinRelType,
+      groupKey: ImmutableBitSet,
+      predicate: RexNode,
+      useMaxNdv: Boolean): JDouble = {
+    if ((predicate == null || predicate.isAlwaysTrue) && groupKey.isEmpty) {
+      return 1d
+    }
+    val join = joinRel.asInstanceOf[Join]
+    if (join.isSemiJoin) {
+      return RelMdUtil.getSemiJoinDistinctRowCount(join, mq, groupKey, predicate)
+    }
+    val leftMask = ImmutableBitSet.builder
+    val rightMask = ImmutableBitSet.builder
+    val left = joinRel.getInputs.get(0)
+    val right = joinRel.getInputs.get(1)
+    RelMdUtil.setLeftRightBitmaps(groupKey, leftMask, rightMask, left.getRowType.getFieldCount)
+    // determine which filters apply to the left vs right
+    val (leftPred, rightPred) = if (predicate != null) {
+      val leftFilters = new util.ArrayList[RexNode]
+      val rightFilters = new util.ArrayList[RexNode]
+      val joinFilters = new util.ArrayList[RexNode]
+      val predList = RelOptUtil.conjunctions(predicate)
+      RelOptUtil.classifyFilters(
+        joinRel,
+        predList,
+        joinType.canPushIntoFromAbove,
+        joinType.canPushLeftFromAbove,
+        joinType.canPushRightFromAbove,
+        joinFilters,
+        leftFilters,
+        rightFilters)
+      val rexBuilder = joinRel.getCluster.getRexBuilder
+      val leftResult = RexUtil.composeConjunction(rexBuilder, leftFilters, true)
+      val rightResult = RexUtil.composeConjunction(rexBuilder, rightFilters, true)
+      (leftResult, rightResult)
+    } else {
+      (null, null)
+    }
+
+    val distRowCount = if (useMaxNdv) {
+      NumberUtil.max(
+        mq.getDistinctRowCount(left, leftMask.build, leftPred),
+        mq.getDistinctRowCount(right, rightMask.build, rightPred))
+    } else {
+      multiply(
+        mq.getDistinctRowCount(left, leftMask.build, leftPred),
+        mq.getDistinctRowCount(right, rightMask.build, rightPred))
+    }
+    val rowCount = mq.getRowCount(joinRel)
+    if (distRowCount == null || rowCount == null) {
+      null
+    } else {
+      FlinkRelMdUtil.numDistinctVals(distRowCount, rowCount)
+    }
+  }
+
   /**
    * Returns the number of distinct values provided numSelected are selected where there are
    * domainSize distinct values.
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRelMdUtilTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRelMdUtilTest.scala
index 03c3a676ffa..4081577f104 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRelMdUtilTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRelMdUtilTest.scala
@@ -39,6 +39,7 @@ class FlinkRelMdUtilTest {
   @Test
   def testNumDistinctValsWithLargeInputs(): Unit = {
     Assert.assertNotEquals(0.0, FlinkRelMdUtil.numDistinctVals(1e18, 1e10))
+    Assert.assertEquals(9.99999993922529e9, FlinkRelMdUtil.numDistinctVals(1e18, 1e10), 1d)
     // this test will fail once CALCITE-4351 is fixed
     // in that case FlinkRelMdUtil#numDistinctVals should be removed
     // see FLINK-19780