You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/12/27 08:37:53 UTC
[flink] branch master updated: [FLINK-30368][table-planner] Fix calcite method RelMdUtil$numDistinctVals() wrongly return zero if the method input domainSize much larger than numSelected
This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new dc862dae28a [FLINK-30368][table-planner] Fix calcite method RelMdUtil$numDistinctVals() wrongly return zero if the method input domainSize much larger than numSelected
dc862dae28a is described below
commit dc862dae28a172f674a9b8a2198c603275304550
Author: zhengyunhong.zyh <33...@qq.com>
AuthorDate: Mon Dec 12 14:44:59 2022 +0800
[FLINK-30368][table-planner] Fix calcite method RelMdUtil$numDistinctVals() wrongly return zero if the method input domainSize much larger than numSelected
This closes #21490
---
.../plan/metadata/FlinkRelMdDistinctRowCount.scala | 2 +-
.../table/planner/plan/utils/FlinkRelMdUtil.scala | 81 ++++++++++++++++++++++
.../planner/plan/utils/FlinkRelMdUtilTest.scala | 1 +
3 files changed, 83 insertions(+), 1 deletion(-)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdDistinctRowCount.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdDistinctRowCount.scala
index f74b7b0473b..86158e0c85a 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdDistinctRowCount.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdDistinctRowCount.scala
@@ -539,7 +539,7 @@ class FlinkRelMdDistinctRowCount private extends MetadataHandler[BuiltInMetadata
}
mq.getDistinctRowCount(rel.getLeft, groupKey, newPred)
case _ =>
- RelMdUtil.getJoinDistinctRowCount(mq, rel, rel.getJoinType, groupKey, predicate, false)
+ FlinkRelMdUtil.getJoinDistinctRowCount(mq, rel, rel.getJoinType, groupKey, predicate, false)
}
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelMdUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelMdUtil.scala
index bab6f791433..a2038d0897b 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelMdUtil.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelMdUtil.scala
@@ -37,6 +37,7 @@ import org.apache.calcite.rex._
import org.apache.calcite.sql.`type`.SqlTypeName.{TIME, TIMESTAMP}
import org.apache.calcite.sql.SqlKind
import org.apache.calcite.util.{ImmutableBitSet, NumberUtil}
+import org.apache.calcite.util.NumberUtil.multiply
import java.math.BigDecimal
import java.util
@@ -247,6 +248,86 @@ object FlinkRelMdUtil {
RexUtil.composeConjunction(rexBuilder, pushable, true)
}
+ /**
+ * This method is copied from calcite RelMdUtil and line 324 ~ 328 are changed. This method should
+ * be removed once CALCITE-4351 is fixed. See CALCITE-4351 and FLINK-19780.
+ *
+ * Computes the number of distinct rows for a set of keys returned from a join. Also known as NDV
+ * (number of distinct values).
+ *
+ * @param joinRel
+ * RelNode representing the join
+ * @param joinType
+ * type of join
+ * @param groupKey
+ * keys that the distinct row count will be computed for
+ * @param predicate
+ * join predicate
+ * @param useMaxNdv
+ * If true use formula <code>max(left NDV, right NDV)</code>, otherwise use <code>left NDV *
+ * right NDV</code>.
+ * @return
+ * number of distinct rows
+ */
+ def getJoinDistinctRowCount(
+ mq: RelMetadataQuery,
+ joinRel: RelNode,
+ joinType: JoinRelType,
+ groupKey: ImmutableBitSet,
+ predicate: RexNode,
+ useMaxNdv: Boolean): JDouble = {
+ if ((predicate == null || predicate.isAlwaysTrue) && groupKey.isEmpty) {
+ return 1d
+ }
+ val join = joinRel.asInstanceOf[Join]
+ if (join.isSemiJoin) {
+ return RelMdUtil.getSemiJoinDistinctRowCount(join, mq, groupKey, predicate)
+ }
+ val leftMask = ImmutableBitSet.builder
+ val rightMask = ImmutableBitSet.builder
+ val left = joinRel.getInputs.get(0)
+ val right = joinRel.getInputs.get(1)
+ RelMdUtil.setLeftRightBitmaps(groupKey, leftMask, rightMask, left.getRowType.getFieldCount)
+ // determine which filters apply to the left vs right
+ val (leftPred, rightPred) = if (predicate != null) {
+ val leftFilters = new util.ArrayList[RexNode]
+ val rightFilters = new util.ArrayList[RexNode]
+ val joinFilters = new util.ArrayList[RexNode]
+ val predList = RelOptUtil.conjunctions(predicate)
+ RelOptUtil.classifyFilters(
+ joinRel,
+ predList,
+ joinType.canPushIntoFromAbove,
+ joinType.canPushLeftFromAbove,
+ joinType.canPushRightFromAbove,
+ joinFilters,
+ leftFilters,
+ rightFilters)
+ val rexBuilder = joinRel.getCluster.getRexBuilder
+ val leftResult = RexUtil.composeConjunction(rexBuilder, leftFilters, true)
+ val rightResult = RexUtil.composeConjunction(rexBuilder, rightFilters, true)
+ (leftResult, rightResult)
+ } else {
+ (null, null)
+ }
+
+ val distRowCount = if (useMaxNdv) {
+ NumberUtil.max(
+ mq.getDistinctRowCount(left, leftMask.build, leftPred),
+ mq.getDistinctRowCount(right, rightMask.build, rightPred))
+ } else {
+ multiply(
+ mq.getDistinctRowCount(left, leftMask.build, leftPred),
+ mq.getDistinctRowCount(right, rightMask.build, rightPred))
+ }
+ val rowCount = mq.getRowCount(joinRel)
+ if (distRowCount == null || rowCount == null) {
+ null
+ } else {
+ FlinkRelMdUtil.numDistinctVals(distRowCount, rowCount)
+ }
+ }
+
/**
* Returns the number of distinct values provided numSelected are selected where there are
* domainSize distinct values.
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRelMdUtilTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRelMdUtilTest.scala
index 03c3a676ffa..4081577f104 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRelMdUtilTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRelMdUtilTest.scala
@@ -39,6 +39,7 @@ class FlinkRelMdUtilTest {
@Test
def testNumDistinctValsWithLargeInputs(): Unit = {
Assert.assertNotEquals(0.0, FlinkRelMdUtil.numDistinctVals(1e18, 1e10))
+ Assert.assertEquals(9.99999993922529e9, FlinkRelMdUtil.numDistinctVals(1e18, 1e10), 1d)
// this test will fail once CALCITE-4351 is fixed
// in that case FlinkRelMdUtil#numDistinctVals should be removed
// see FLINK-19780