You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2023/07/28 21:15:16 UTC

[impala] branch master updated: IMPALA-11924: Cap runtime filter NDV with build key NDV

This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 12dee267f IMPALA-11924: Cap runtime filter NDV with build key NDV
12dee267f is described below

commit 12dee267fc56bcfef757285d1c698dfc241d2a05
Author: Csaba Ringhofer <cs...@cloudera.com>
AuthorDate: Thu Feb 16 17:33:48 2023 +0100

    IMPALA-11924: Cap runtime filter NDV with build key NDV
    
    Before this patch, the NDV used for bloom filter sizing was based only
    on the cardinality of the build side. This is ok for FK/PK joins but
    can highly overestimate NDV if the build key column's NDV is smaller
    than the number of rows.
    
    This change takes the minimum of NDV (not changed by selectiveness)
    and cardinality (reduced by selectiveness).
    
    Testing:
    - Adjust test_bloom_filters and test_row_filters, raising the NDV of
      the test case such that the assertion is maintained.
    - Add 8KB bloom filter test case in test_bloom_filters.
    
    Change-Id: Idaa46789663cb2e6d29f518757d89c85ff8e4d1a
    Reviewed-on: http://gerrit.cloudera.org:8080/19506
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Michael Smith <mi...@cloudera.com>
---
 .../impala/planner/RuntimeFilterGenerator.java     | 39 +++++++++++++++++++---
 .../queries/QueryTest/bloom_filters.test           | 14 +++++++-
 .../queries/QueryTest/runtime_row_filters.test     | 13 +++++---
 3 files changed, 56 insertions(+), 10 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
index a9f526fac..0e4a6b36e 100644
--- a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
+++ b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
@@ -214,8 +214,12 @@ public final class RuntimeFilterGenerator {
     // for the filter. A value of -1 means no estimate is available, and default filter
     // parameters should be used.
     private long ndvEstimate_ = -1;
+    // NDV of the build side key expression.
+    private long buildKeyNdv_ = -1;
     // Size of the filter (in Bytes). Should be greater than zero for bloom filters.
     private long filterSizeBytes_ = 0;
+    // Size of the filter (in Bytes) before applying min/max filter sizes.
+    private long originalFilterSizeBytes_ = 0;
     // If true, the filter is produced by a broadcast join and there is at least one
     // destination scan node which is in the same fragment as the join; set in
     // DistributedPlanner.createHashJoinFragment().
@@ -658,7 +662,18 @@ public final class RuntimeFilterGenerator {
 
     public void setIsBroadcast(boolean isBroadcast) { isBroadcastJoin_ = isBroadcast; }
 
-    public void computeNdvEstimate() { ndvEstimate_ = src_.getChild(1).getCardinality(); }
+    public void computeNdvEstimate() {
+      ndvEstimate_ = src_.getChild(1).getCardinality();
+      buildKeyNdv_ = srcExpr_.getNumDistinctValues();
+
+      // Build side selectiveness is calculated into the cardinality but not into
+      // buildKeyNdv_. This can lead to overestimating NDV compared to how the Join node
+      // estimates build side NDV (JoinNode.getGenericJoinCardinalityInternal()).
+      if (buildKeyNdv_ >= 0) {
+        ndvEstimate_ =
+            ndvEstimate_ >= 0 ? Math.min(buildKeyNdv_, ndvEstimate_) : buildKeyNdv_;
+      }
+    }
 
     public void computeHasLocalTargets() {
       Preconditions.checkNotNull(src_.getFragment());
@@ -696,7 +711,8 @@ public final class RuntimeFilterGenerator {
       }
       double targetFpp = filterSizeLimits.targetFpp;
       int logFilterSize = FeSupport.GetMinLogSpaceForBloomFilter(ndvEstimate_, targetFpp);
-      filterSizeBytes_ = 1L << logFilterSize;
+      originalFilterSizeBytes_ = 1L << logFilterSize;
+      filterSizeBytes_ = originalFilterSizeBytes_;
       filterSizeBytes_ = Math.max(filterSizeBytes_, filterSizeLimits.minVal);
       filterSizeBytes_ = Math.min(filterSizeBytes_, filterSizeLimits.maxVal);
     }
@@ -718,7 +734,12 @@ public final class RuntimeFilterGenerator {
           .append("SrcExpr: " + getSrcExpr().debugString() +  " ")
           .append("Target(s): ")
           .append(Joiner.on(", ").join(targets_) + " ")
-          .append("Selectivity: " + getSelectivity()).toString();
+          .append("Selectivity: " + getSelectivity() + " ")
+          .append("Build key NDV: " + buildKeyNdv_ + " ")
+          .append("NDV estimate " + ndvEstimate_ + " ")
+          .append("Filter size (bytes): " + filterSizeBytes_ + " ")
+          .append("Original filter size (bytes): " + originalFilterSizeBytes_ + " ")
+          .toString();
     }
   }
 
@@ -754,14 +775,22 @@ public final class RuntimeFilterGenerator {
     int numBloomFilters = 0;
     for (RuntimeFilter filter : filters) {
       if (filter.getType() == TRuntimeFilterType.BLOOM) {
-        if (numBloomFilters >= maxNumBloomFilters) continue;
+        if (numBloomFilters >= maxNumBloomFilters) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("Skip runtime filter (already reached max runtime filter count): "
+                + filter.debugString());
+          }
+          continue;
+        }
         ++numBloomFilters;
       }
       DistributionMode distMode = filter.src_.getDistributionMode();
       filter.setIsBroadcast(distMode == DistributionMode.BROADCAST);
       if (filter.getType() == TRuntimeFilterType.IN_LIST
           && distMode == DistributionMode.PARTITIONED) {
-        LOG.trace("Skip IN-list filter on partitioned join: {}", filter.debugString());
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Skip IN-list filter on partitioned join: {}", filter.debugString());
+        }
         continue;
       }
       filter.computeHasLocalTargets();
diff --git a/testdata/workloads/functional-query/queries/QueryTest/bloom_filters.test b/testdata/workloads/functional-query/queries/QueryTest/bloom_filters.test
index 6cb478083..a11d1b271 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/bloom_filters.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/bloom_filters.test
@@ -13,6 +13,17 @@
 ####################################################
 SET RUNTIME_FILTER_MODE=GLOBAL;
 SET RUNTIME_FILTER_WAIT_TIME_MS=30000;
+SET RUNTIME_FILTER_MIN_SIZE=8KB;
+select count(*) from tpch.orders join tpch.customer on o_comment = c_mktsegment;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+row_regex: .*1 of 1 Runtime Filter Published.*
+row_regex: .*Filter 0 \(8.00 KB\).*
+====
+---- QUERY
+SET RUNTIME_FILTER_MODE=GLOBAL;
+SET RUNTIME_FILTER_WAIT_TIME_MS=30000;
 SET RUNTIME_FILTER_MIN_SIZE=64KB;
 with l as (select * from tpch.lineitem UNION ALL select * from tpch.lineitem)
 select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a
@@ -53,7 +64,8 @@ row_regex: .*Filter 0 \(512.00 KB\).*
 SET RUNTIME_FILTER_MODE=GLOBAL;
 SET RUNTIME_FILTER_WAIT_TIME_MS=30000;
 SET RUNTIME_FILTER_MIN_SIZE=64KB;
-with l as (select * from tpch.lineitem UNION ALL select * from tpch.lineitem)
+with l as (select l_orderkey from tpch.lineitem UNION ALL
+           select l_orderkey + 6000000 from tpch.lineitem)
 select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a
     join (select * from l LIMIT 2000000) b on a.l_orderkey = -b.l_orderkey;
 ---- RESULTS
diff --git a/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test b/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test
index 583e627cf..33270bb90 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test
@@ -261,12 +261,17 @@ aggregation(SUM, ProbeRows): 7300
 SET RUNTIME_FILTER_MODE=GLOBAL;
 SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
 with l as (select l_orderkey from tpch_parquet.lineitem UNION ALL
-           select l_orderkey from tpch_parquet.lineitem UNION ALL
-           select l_orderkey from tpch_parquet.lineitem UNION ALL
-           select l_orderkey from tpch_parquet.lineitem)
+           select l_orderkey + (6000000 * 1) from tpch_parquet.lineitem UNION ALL
+           select l_orderkey + (6000000 * 2) from tpch_parquet.lineitem UNION ALL
+           select l_orderkey + (6000000 * 3) from tpch_parquet.lineitem UNION ALL
+           select l_orderkey + (6000000 * 4) from tpch_parquet.lineitem UNION ALL
+           select l_orderkey + (6000000 * 5) from tpch_parquet.lineitem UNION ALL
+           select l_orderkey + (6000000 * 6) from tpch_parquet.lineitem UNION ALL
+           select l_orderkey + (6000000 * 7) from tpch_parquet.lineitem UNION ALL
+           select l_orderkey + (6000000 * 8) from tpch_parquet.lineitem)
 select STRAIGHT_JOIN count(*) from
     (select l_orderkey from tpch_parquet.lineitem a LIMIT 1) a
-    join (select l_orderkey from l UNION ALL select l_orderkey from l) b
+    join (select l_orderkey from l UNION ALL select l_orderkey + 100000000 from l) b
     on a.l_orderkey = -b.l_orderkey
 ---- RESULTS
 0