You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2020/03/19 06:41:38 UTC

[impala] branch master updated: IMPALA-9530: query option to limit preagg memory

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new ca53f68  IMPALA-9530: query option to limit preagg memory
ca53f68 is described below

commit ca53f68525504b6a1a64bb35c7edb57d030916d7
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Mon Mar 16 17:09:18 2020 -0700

    IMPALA-9530: query option to limit preagg memory
    
    This adds an advanced PREAGG_BYTES_LIMIT query option that
    allows limiting the memory consumption of streaming
    preaggregation operators in a query.
    
    It works by setting a maximum reservation on each grouping
    aggregator in a preaggregation node. The aggregators switch
    to passthrough mode automatically when hitting this limit,
    the same as if they were hitting the query memory limit.
    
    This does not override the minimum reservation computed for
    the aggregation - if the limit is less than the minimum
    reservation, the minimum reservation is used as the limit
    instead.
    
    The default behaviour is unchanged.
    
    Testing:
    Add a planner test with estimates higher and lower than limit
    to ensure that resource estimates correctly reflect the option.
    
    Add an end-to-end test that verifies that the option forces
    passthrough when the memory limit is hit.
    
    Change-Id: I87f7a5c68da93d068e304ef01afbcbb0d56807d9
    Reviewed-on: http://gerrit.cloudera.org:8080/15463
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/service/query-options-test.cc               |   1 +
 be/src/service/query-options.cc                    |   8 ++
 be/src/service/query-options.h                     |   5 +-
 common/thrift/ImpalaInternalService.thrift         |   3 +
 common/thrift/ImpalaService.thrift                 |   4 +
 .../org/apache/impala/planner/AggregationNode.java |  15 ++-
 .../org/apache/impala/planner/PlannerTest.java     |  10 ++
 .../queries/PlannerTest/preagg-bytes-limit.test    | 137 +++++++++++++++++++++
 .../queries/tpch-passthrough-aggregations.test     |  26 +++-
 9 files changed, 203 insertions(+), 6 deletions(-)

diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc
index 4387e8f..0cc30ed 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -152,6 +152,7 @@ TEST(QueryOptions, SetByteOptions) {
       {MAKE_OPTIONDEF(topn_bytes_limit), {-1, I64_MAX}},
       {MAKE_OPTIONDEF(mem_limit_executors), {-1, I64_MAX}},
       {MAKE_OPTIONDEF(broadcast_bytes_limit), {-1, I64_MAX}},
+      {MAKE_OPTIONDEF(preagg_bytes_limit), {-1, I64_MAX}},
   };
   vector<pair<OptionDef<int32_t>, Range<int32_t>>> case_set_i32{
       {MAKE_OPTIONDEF(runtime_filter_min_size),
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index a787865..7042d61 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -916,6 +916,14 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_broadcast_bytes_limit(broadcast_bytes_limit);
         break;
       }
+      case TImpalaQueryOptions::PREAGG_BYTES_LIMIT: {
+        // Parse the preaggregation bytes limit and validate it
+        int64_t preagg_bytes_limit;
+        RETURN_IF_ERROR(
+            ParseMemValue(value, "preaggregation bytes limit", &preagg_bytes_limit));
+        query_options->__set_preagg_bytes_limit(preagg_bytes_limit);
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 032bfb5..5bbd51a 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -47,7 +47,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::BROADCAST_BYTES_LIMIT + 1);\
+      TImpalaQueryOptions::PREAGG_BYTES_LIMIT + 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -193,7 +193,8 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
   QUERY_OPT_FN(parquet_object_store_split_size, PARQUET_OBJECT_STORE_SPLIT_SIZE,\
       TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(mem_limit_executors, MEM_LIMIT_EXECUTORS, TQueryOptionLevel::DEVELOPMENT)\
-  QUERY_OPT_FN(broadcast_bytes_limit, BROADCAST_BYTES_LIMIT, TQueryOptionLevel::ADVANCED)
+  QUERY_OPT_FN(broadcast_bytes_limit, BROADCAST_BYTES_LIMIT, TQueryOptionLevel::ADVANCED)\
+  QUERY_OPT_FN(preagg_bytes_limit, PREAGG_BYTES_LIMIT, TQueryOptionLevel::ADVANCED)
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 24112c4..c70e682 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -407,6 +407,9 @@ struct TQueryOptions {
   // See comment in ImpalaService.thrift
   // The default value is set to 32 GB
   98: optional i64 broadcast_bytes_limit = 34359738368;
+
+  // See comment in ImpalaService.thrift
+  99: optional i64 preagg_bytes_limit = -1;
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 3a733c5..7065715 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -503,6 +503,10 @@ enum TImpalaQueryOptions {
   // exchange will exceed this limit, it will not consider a broadcast and instead
   // fall back on a hash partition exchange. 0 or -1 means this has no effect.
   BROADCAST_BYTES_LIMIT = 97
+
+  // The max reservation that each grouping class in a preaggregation will use.
+  // 0 or -1 means this has no effect.
+  PREAGG_BYTES_LIMIT = 98
 }
 
 // The summary of a DML statement.
diff --git a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
index 239afc6..b8d271c 100644
--- a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
@@ -573,11 +573,20 @@ public class AggregationNode extends PlanNode {
       }
     }
 
-    return new ResourceProfileBuilder()
+    ResourceProfileBuilder builder = new ResourceProfileBuilder()
         .setMemEstimateBytes(perInstanceMemEstimate)
         .setMinMemReservationBytes(perInstanceMinMemReservation)
         .setSpillableBufferBytes(bufferSize)
-        .setMaxRowBufferBytes(maxRowBufferSize)
-        .build();
+        .setMaxRowBufferBytes(maxRowBufferSize);
+    if (useStreamingPreagg_ && queryOptions.getPreagg_bytes_limit() > 0) {
+      long maxReservationBytes =
+          Math.max(perInstanceMinMemReservation, queryOptions.getPreagg_bytes_limit());
+      builder.setMaxMemReservationBytes(maxReservationBytes);
+      // Aggregations should generally not use significantly more than the max
+      // reservation, since the bulk of the memory is reserved.
+      builder.setMemEstimateBytes(
+          Math.min(perInstanceMemEstimate, maxReservationBytes));
+    }
+    return builder.build();
   }
 }
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index fa81c84..a19a584 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -1018,4 +1018,14 @@ public class PlannerTest extends PlannerTestBase {
     runPlannerTestFile("broadcast-bytes-limit-large", "tpch_parquet", options);
   }
 
+  /**
+   * Check that planner estimates reflect the preagg bytes limit.
+   */
+  @Test
+  public void testPreaggBytesLimit() {
+    TQueryOptions options = defaultQueryOptions();
+    options.setPreagg_bytes_limit(64 * 1024 * 1024);
+    options.setExplain_level(TExplainLevel.EXTENDED);
+    runPlannerTestFile("preagg-bytes-limit", "tpch_parquet", options);
+  }
 }
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/preagg-bytes-limit.test b/testdata/workloads/functional-planner/queries/PlannerTest/preagg-bytes-limit.test
new file mode 100644
index 0000000..5e923cf
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/preagg-bytes-limit.test
@@ -0,0 +1,137 @@
+# Query where estimate for preagg is higher than PREAGG_BYTES_LIMIT
+select distinct l_orderkey, l_partkey, l_suppkey from tpch_parquet.lineitem
+---- DISTRIBUTEDPLAN
+F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=10.08MB mem-reservation=0B thread-reservation=1
+PLAN-ROOT SINK
+|  output exprs: l_orderkey, l_partkey, l_suppkey
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+04:EXCHANGE [UNPARTITIONED]
+|  mem-estimate=10.08MB mem-reservation=0B thread-reservation=0
+|  tuple-ids=1 row-size=24B cardinality=6.00M
+|  in pipelines: 03(GETNEXT)
+|
+F01:PLAN FRAGMENT [HASH(l_orderkey,l_partkey,l_suppkey)] hosts=3 instances=3
+Per-Host Resources: mem-estimate=85.63MB mem-reservation=34.00MB thread-reservation=1
+03:AGGREGATE [FINALIZE]
+|  group by: l_orderkey, l_partkey, l_suppkey
+|  mem-estimate=75.55MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=1 row-size=24B cardinality=6.00M
+|  in pipelines: 03(GETNEXT), 00(OPEN)
+|
+02:EXCHANGE [HASH(l_orderkey,l_partkey,l_suppkey)]
+|  mem-estimate=10.08MB mem-reservation=0B thread-reservation=0
+|  tuple-ids=1 row-size=24B cardinality=6.00M
+|  in pipelines: 00(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=304.00MB mem-reservation=50.00MB thread-reservation=2
+01:AGGREGATE [STREAMING]
+|  group by: l_orderkey, l_partkey, l_suppkey
+|  mem-estimate=64.00MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=1 row-size=24B cardinality=6.00M
+|  in pipelines: 00(GETNEXT)
+|
+00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
+   HDFS partitions=1/1 files=3 size=194.00MB
+   stored statistics:
+     table: rows=6.00M size=194.00MB
+     columns: all
+   extrapolated-rows=disabled max-scan-range-rows=2.14M
+   mem-estimate=240.00MB mem-reservation=16.00MB thread-reservation=1
+   tuple-ids=0 row-size=24B cardinality=6.00M
+   in pipelines: 00(GETNEXT)
+====
+# Query where estimate for preagg is lower than PREAGG_BYTES_LIMIT
+select distinct l_suppkey from tpch_parquet.lineitem
+---- DISTRIBUTEDPLAN
+F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=61.29KB mem-reservation=0B thread-reservation=1
+PLAN-ROOT SINK
+|  output exprs: l_suppkey
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+04:EXCHANGE [UNPARTITIONED]
+|  mem-estimate=61.29KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=1 row-size=8B cardinality=9.71K
+|  in pipelines: 03(GETNEXT)
+|
+F01:PLAN FRAGMENT [HASH(l_suppkey)] hosts=3 instances=3
+Per-Host Resources: mem-estimate=10.06MB mem-reservation=1.94MB thread-reservation=1
+03:AGGREGATE [FINALIZE]
+|  group by: l_suppkey
+|  mem-estimate=10.00MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=1 row-size=8B cardinality=9.71K
+|  in pipelines: 03(GETNEXT), 00(OPEN)
+|
+02:EXCHANGE [HASH(l_suppkey)]
+|  mem-estimate=61.29KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=1 row-size=8B cardinality=9.71K
+|  in pipelines: 00(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=90.00MB mem-reservation=6.00MB thread-reservation=2
+01:AGGREGATE [STREAMING]
+|  group by: l_suppkey
+|  mem-estimate=10.00MB mem-reservation=2.00MB spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=1 row-size=8B cardinality=9.71K
+|  in pipelines: 00(GETNEXT)
+|
+00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
+   HDFS partitions=1/1 files=3 size=194.00MB
+   stored statistics:
+     table: rows=6.00M size=194.00MB
+     columns: all
+   extrapolated-rows=disabled max-scan-range-rows=2.14M
+   mem-estimate=80.00MB mem-reservation=4.00MB thread-reservation=1
+   tuple-ids=0 row-size=8B cardinality=6.00M
+   in pipelines: 00(GETNEXT)
+====
+# Query where estimate for preagg is less than minimum reservation
+select distinct l_orderkey, l_partkey, l_suppkey from tpch_parquet.lineitem
+---- QUERYOPTIONS
+DEFAULT_SPILLABLE_BUFFER_SIZE=8M
+---- DISTRIBUTEDPLAN
+F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=10.08MB mem-reservation=0B thread-reservation=1
+PLAN-ROOT SINK
+|  output exprs: l_orderkey, l_partkey, l_suppkey
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+04:EXCHANGE [UNPARTITIONED]
+|  mem-estimate=10.08MB mem-reservation=0B thread-reservation=0
+|  tuple-ids=1 row-size=24B cardinality=6.00M
+|  in pipelines: 03(GETNEXT)
+|
+F01:PLAN FRAGMENT [HASH(l_orderkey,l_partkey,l_suppkey)] hosts=3 instances=3
+Per-Host Resources: mem-estimate=146.08MB mem-reservation=136.00MB thread-reservation=1
+03:AGGREGATE [FINALIZE]
+|  group by: l_orderkey, l_partkey, l_suppkey
+|  mem-estimate=136.00MB mem-reservation=136.00MB spill-buffer=8.00MB thread-reservation=0
+|  tuple-ids=1 row-size=24B cardinality=6.00M
+|  in pipelines: 03(GETNEXT), 00(OPEN)
+|
+02:EXCHANGE [HASH(l_orderkey,l_partkey,l_suppkey)]
+|  mem-estimate=10.08MB mem-reservation=0B thread-reservation=0
+|  tuple-ids=1 row-size=24B cardinality=6.00M
+|  in pipelines: 00(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=376.00MB mem-reservation=152.00MB thread-reservation=2
+01:AGGREGATE [STREAMING]
+|  group by: l_orderkey, l_partkey, l_suppkey
+|  mem-estimate=136.00MB mem-reservation=136.00MB spill-buffer=8.00MB thread-reservation=0
+|  tuple-ids=1 row-size=24B cardinality=6.00M
+|  in pipelines: 00(GETNEXT)
+|
+00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
+   HDFS partitions=1/1 files=3 size=194.00MB
+   stored statistics:
+     table: rows=6.00M size=194.00MB
+     columns: all
+   extrapolated-rows=disabled max-scan-range-rows=2.14M
+   mem-estimate=240.00MB mem-reservation=16.00MB thread-reservation=1
+   tuple-ids=0 row-size=24B cardinality=6.00M
+   in pipelines: 00(GETNEXT)
+====
diff --git a/testdata/workloads/tpch/queries/tpch-passthrough-aggregations.test b/testdata/workloads/tpch/queries/tpch-passthrough-aggregations.test
index ef8a9d2..5b9b1c5 100644
--- a/testdata/workloads/tpch/queries/tpch-passthrough-aggregations.test
+++ b/testdata/workloads/tpch/queries/tpch-passthrough-aggregations.test
@@ -86,4 +86,28 @@ string,string,int,bigint
 # Verify that passthrough was activated.
 row_regex: .*RowsPassedThrough: .* \([1-9][0-9]*\)
 ====
-
+---- QUERY
+# Test that, without preagg_bytes_limit, this query does not pass through rows.
+set default_spillable_buffer_size=64k;
+select count(distinct l_orderkey) from tpch_parquet.lineitem
+---- RESULTS
+1500000
+---- TYPES
+BIGINT
+---- RUNTIME_PROFILE
+# Verify that passthrough was activated.
+row_regex: .*RowsPassedThrough: 0
+====
+---- QUERY
+# Test that preagg_bytes_limit forces the preagg to pass through rows.
+set default_spillable_buffer_size=64k;
+set preagg_bytes_limit=10m;
+select count(distinct l_orderkey) from tpch_parquet.lineitem
+---- RESULTS
+1500000
+---- TYPES
+BIGINT
+---- RUNTIME_PROFILE
+# Verify that passthrough was activated.
+row_regex: .*RowsPassedThrough: .* \([1-9][0-9]*\)
+====