You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2023/07/13 15:21:51 UTC

[impala] 01/02: IMPALA-11842: Improve memory estimation for Aggregate

This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 9070895ed3b0ebb2506ddbf9d7bda9ffc1089bf6
Author: Riza Suminto <ri...@cloudera.com>
AuthorDate: Tue Jun 13 13:16:18 2023 -0700

    IMPALA-11842: Improve memory estimation for Aggregate
    
    Planner often overestimates aggregation node memory estimate since it
    uses simple multiplication of NDVs of contributing grouping columns.
    This patch introduces new query options LARGE_AGG_MEM_THRESHOLD and
    AGG_MEM_CORRELATION_FACTOR. If the estimated perInstanceDataBytes from
    the NDV multiplication method exceed LARGE_AGG_MEM_THRESHOLD, recompute
    perInstanceDataBytes again by comparing against the max(NDV) &
    AGG_MEM_CORRELATION_FACTOR method.
    
    perInstanceDataBytes is kept at LARGE_AGG_MEM_THRESHOLD at a minimum so
    that low max(NDV) will not negatively impact query execution. Unlike
    PREAGG_BYTES_LIMIT, LARGE_AGG_MEM_THRESHOLD is evaluated on both
    preaggregation and final aggregation, and does not cap max memory
    reservation of the aggregation node (it may still increase memory
    allocation beyond the estimate if it is available). However, if a plan
    node is a streaming preaggregation node and PREAGG_BYTES_LIMIT is set,
    then PREAGG_BYTES_LIMIT will override the value of
    LARGE_AGG_MEM_THRESHOLD as a threshold.
    
    Testing:
    - Run the patch with 10 nodes, MT_DOP=12, against TPC-DS 3TB scale.
      Among 103 queries, 20 queries have lower
      "Per-Host Resource Estimates", 11 have lower "Cluster Memory
      Admitted", and 3 have over 10% reduced latency. No significant
      regression in query latency was observed.
    - Pass core tests.
    
    Change-Id: Ia4b4b2e519ee89f0a13fdb62d0471ee4047f6421
    Reviewed-on: http://gerrit.cloudera.org:8080/20104
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/service/query-options-test.cc               |   1 +
 be/src/service/query-options.cc                    |  13 +
 be/src/service/query-options.h                     |   6 +-
 common/thrift/ImpalaService.thrift                 |  21 +
 common/thrift/Query.thrift                         |   8 +
 .../org/apache/impala/planner/AggregationNode.java | 129 ++++--
 .../org/apache/impala/planner/HdfsTableSink.java   |   3 +-
 .../org/apache/impala/planner/PlanFragment.java    |  18 +-
 .../org/apache/impala/planner/CardinalityTest.java |  11 +-
 .../org/apache/impala/planner/PlannerTest.java     |  32 +-
 .../org/apache/impala/planner/PlannerTestBase.java |  19 +-
 .../apache/impala/planner/TpcdsPlannerTest.java    |  16 +-
 ...cds-q67.test => agg-node-max-mem-estimate.test} | 487 +++++++++++++++++++++
 .../queries/PlannerTest/resource-requirements.test |   6 +-
 .../queries/PlannerTest/tpcds/tpcds-q22.test       |  32 +-
 .../queries/PlannerTest/tpcds/tpcds-q67.test       |  18 +-
 16 files changed, 714 insertions(+), 106 deletions(-)

diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc
index f4e9ab9cf..a1c8182bf 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -159,6 +159,7 @@ TEST(QueryOptions, SetByteOptions) {
       {MAKE_OPTIONDEF(scratch_limit), {-1, I64_MAX}},
       {MAKE_OPTIONDEF(max_result_spooling_mem), {-1, I64_MAX}},
       {MAKE_OPTIONDEF(max_spilled_result_spooling_mem), {-1, I64_MAX}},
+      {MAKE_OPTIONDEF(large_agg_mem_threshold), {-1, I64_MAX}},
   };
   vector<pair<OptionDef<int32_t>, Range<int32_t>>> case_set_i32{
       {MAKE_OPTIONDEF(runtime_filter_min_size),
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index c01ffc654..d95c10418 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -1127,6 +1127,19 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_values_stmt_avoid_lossy_char_padding(IsTrue(value));
         break;
       }
+      case TImpalaQueryOptions::LARGE_AGG_MEM_THRESHOLD: {
+        MemSpec mem_spec_val{};
+        RETURN_IF_ERROR(QueryOptionParser::Parse<MemSpec>(option, value, &mem_spec_val));
+        query_options->__set_large_agg_mem_threshold(mem_spec_val.value);
+        break;
+      }
+      case TImpalaQueryOptions::AGG_MEM_CORRELATION_FACTOR: {
+        double double_val = 0.0f;
+        RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckInclusiveRange<double>(
+            option, value, 0.0, 1.0, &double_val));
+        query_options->__set_agg_mem_correlation_factor(double_val);
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 79b8e6646..bb859b4fe 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -50,7 +50,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE                                                                 \
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),                                 \
-      TImpalaQueryOptions::VALUES_STMT_AVOID_LOSSY_CHAR_PADDING + 1);                    \
+      TImpalaQueryOptions::AGG_MEM_CORRELATION_FACTOR + 1);                              \
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED) \
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)               \
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)             \
@@ -300,6 +300,10 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
       TQueryOptionLevel::ADVANCED)                                                       \
   QUERY_OPT_FN(values_stmt_avoid_lossy_char_padding,                                     \
       VALUES_STMT_AVOID_LOSSY_CHAR_PADDING, TQueryOptionLevel::REGULAR)                  \
+  QUERY_OPT_FN(                                                                          \
+      large_agg_mem_threshold, LARGE_AGG_MEM_THRESHOLD, TQueryOptionLevel::ADVANCED)     \
+  QUERY_OPT_FN(agg_mem_correlation_factor, AGG_MEM_CORRELATION_FACTOR,                   \
+      TQueryOptionLevel::ADVANCED)                                                       \
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 49862e892..726264079 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -812,6 +812,27 @@ enum TImpalaQueryOptions {
   // CHAR type as the common type. This avoids padding and thereby loss of information.
   // See IMPALA-10753.
   VALUES_STMT_AVOID_LOSSY_CHAR_PADDING = 161;
+
+  // Threshold in bytes to determine whether an aggregation node's memory estimate is
+  // deemed large. If an aggregation node's memory estimate is large, an alternative
+  // estimation is used to lower the memory usage estimation for that aggregation node.
+  // The new memory estimate will not be lower than the specified
+  // LARGE_AGG_MEM_THRESHOLD. Unlike PREAGG_BYTES_LIMIT, LARGE_AGG_MEM_THRESHOLD is
+  // evaluated on both preagg and merge agg, and does not cap max memory reservation of
+  // the aggregation node (it may still increase memory allocation beyond the threshold
+  // if it is available). However, if a plan node is a streaming preaggregation node and
+  // PREAGG_BYTES_LIMIT is set, then PREAGG_BYTES_LIMIT will override the value of
+  // LARGE_AGG_MEM_THRESHOLD as a threshold. 0 or -1 means this option has no effect.
+  LARGE_AGG_MEM_THRESHOLD = 162
+
+  // Correlation factor that will be used to calculate a lower memory estimation of
+  // aggregation node when the default memory estimation exceed
+  // LARGE_AGG_MEM_THRESHOLD. Given N as number of grouping expressions,
+  // the final correlation factor is calculated as:
+  //   corrFactor = AGG_MEM_CORRELATION_FACTOR ^ N
+  // Valid values are in [0.0, 1.0]. Setting value 1.0 will result in an equal memory
+  // estimate as the default estimation (no change). Default to 0.5.
+  AGG_MEM_CORRELATION_FACTOR = 163
 }
 
 // The summary of a DML statement.
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index 6a610f4b0..e67aba9c8 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -114,6 +114,8 @@ const i32 NUM_NODES_ALL_RACKS = -1
 // constant used as upperbound for TQueryOptions.processing_cost_min_threads and
 // TQueryOptions.max_fragment_instances_per_node
 const i32 MAX_FRAGMENT_INSTANCES_PER_NODE = 128
+// Conservative minimum size of hash table for low-cardinality aggregations.
+const i64 MIN_HASH_TBL_MEM = 10485760  // 10MB
 
 // Query options that correspond to ImpalaService.ImpalaQueryOptions, with their
 // respective defaults. Query options can be set in the following ways:
@@ -649,6 +651,12 @@ struct TQueryOptions {
 
   // See comment in ImpalaService.thrift
   162: optional bool values_stmt_avoid_lossy_char_padding = false;
+
+  // See comment in ImpalaService.thrift
+  163: optional i64 large_agg_mem_threshold = 536870912  // 512MB
+
+  // See comment in ImpalaService.thrift
+  164: optional double agg_mem_correlation_factor = 0.5
 }
 
 // Impala currently has three types of sessions: Beeswax, HiveServer2 and external
diff --git a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
index 641d63e9c..dae282169 100644
--- a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
@@ -35,6 +35,7 @@ import org.apache.impala.analysis.NumericLiteral;
 import org.apache.impala.analysis.TupleId;
 import org.apache.impala.analysis.ValidTupleIdExpr;
 import org.apache.impala.common.InternalException;
+import org.apache.impala.thrift.QueryConstants;
 import org.apache.impala.thrift.TAggregationNode;
 import org.apache.impala.thrift.TAggregator;
 import org.apache.impala.thrift.TExplainLevel;
@@ -82,9 +83,6 @@ public class AggregationNode extends PlanNode {
   // Set in computeNodeResourceProfile().
   private List<ResourceProfile> resourceProfiles_;
 
-  // Conservative minimum size of hash table for low-cardinality aggregations.
-  protected final static long MIN_HASH_TBL_MEM = 10L * 1024L * 1024L;
-
   // If the group clause is empty ( aggInfo.getGroupingExprs() is empty ),
   // the hash table will not be created.
   // Peak memory is at least 16k, which is an empirical value
@@ -596,60 +594,105 @@ public class AggregationNode extends PlanNode {
     }
   }
 
+  private long estimatePerInstanceDataBytes(
+      long perInstanceCardinality, long inputCardinality) {
+    Preconditions.checkArgument(perInstanceCardinality > -1);
+    // Per-instance cardinality cannot be greater than the total input cardinality
+    // going into this aggregation class.
+    if (inputCardinality != -1) {
+      // Calculate the input cardinality distributed across fragment instances.
+      long numInstances = fragment_.getNumInstances();
+      long perInstanceInputCardinality;
+      if (numInstances > 1) {
+        if (useStreamingPreagg_) {
+          // A skew factor was added to account for data skew among
+          // multiple fragment instances.
+          // This number was derived using empirical analysis of real-world
+          // and benchmark (tpch, tpcds) queries.
+          perInstanceInputCardinality = (long) Math.ceil(
+              ((double) inputCardinality / numInstances) * DEFAULT_SKEW_FACTOR);
+        } else {
+          // The data is distributed through hash, it will be more balanced.
+          perInstanceInputCardinality =
+              (long) Math.ceil((double) inputCardinality / numInstances);
+        }
+      } else {
+        // When numInstances is 1 or unknown(-1), perInstanceInputCardinality is the
+        // same as inputCardinality.
+        perInstanceInputCardinality = inputCardinality;
+      }
+
+      if (useStreamingPreagg_) {
+        // A reduction factor of 2 (input rows divided by output rows) was
+        // added to grow hash tables. If the reduction factor is lower than 2,
+        // only part of the data will be inserted into the hash table.
+        perInstanceCardinality =
+            Math.min(perInstanceCardinality, perInstanceInputCardinality / 2);
+      } else {
+        perInstanceCardinality =
+            Math.min(perInstanceCardinality, perInstanceInputCardinality);
+      }
+    }
+    // The memory of the data stored in hash table and the memory of the
+    // hash table‘s structure
+    long perInstanceDataBytes = (long) Math.ceil(
+        perInstanceCardinality * (avgRowSize_ + PlannerContext.SIZE_OF_BUCKET));
+    return perInstanceDataBytes;
+  }
+
   private ResourceProfile computeAggClassResourceProfile(
       TQueryOptions queryOptions, AggregateInfo aggInfo, long inputCardinality) {
     Preconditions.checkNotNull(
         fragment_, "PlanNode must be placed into a fragment before calling this method.");
-    long perInstanceCardinality = fragment_.getPerInstanceNdv(aggInfo.getGroupingExprs());
+    long perInstanceCardinality =
+        fragment_.getPerInstanceNdv(aggInfo.getGroupingExprs(), false);
     long perInstanceMemEstimate;
     long perInstanceDataBytes = -1;
+
+    // Determine threshold for large aggregation node.
+    long largeAggMemThreshold = Long.MAX_VALUE;
+    if (useStreamingPreagg_ && queryOptions.getPreagg_bytes_limit() > 0) {
+      largeAggMemThreshold = queryOptions.getPreagg_bytes_limit();
+    } else if (queryOptions.getLarge_agg_mem_threshold() > 0) {
+      largeAggMemThreshold = queryOptions.getLarge_agg_mem_threshold();
+    }
+
     if (perInstanceCardinality == -1) {
       perInstanceMemEstimate = DEFAULT_PER_INSTANCE_MEM;
     } else {
-      // Per-instance cardinality cannot be greater than the total input cardinality
-      // going into this aggregation class.
-      if (inputCardinality != -1) {
-        // Calculate the input cardinality distributed across fragment instances.
-        long numInstances = fragment_.getNumInstances();
-        long perInstanceInputCardinality;
-        if (numInstances > 1) {
-          if (useStreamingPreagg_) {
-            // A skew factor was added to account for data skew among
-            // multiple fragment instances.
-            // This number was derived using empirical analysis of real-world
-            // and benchmark (tpch, tpcds) queries.
-            perInstanceInputCardinality = (long) Math.ceil(
-                ((double) inputCardinality / numInstances) * DEFAULT_SKEW_FACTOR);
-          } else {
-            // The data is distributed through hash, it will be more balanced.
-            perInstanceInputCardinality =
-                (long) Math.ceil((double) inputCardinality / numInstances);
-          }
-        } else {
-          // When numInstances is 1 or unknown(-1), perInstanceInputCardinality is the
-          // same as inputCardinality.
-          perInstanceInputCardinality = inputCardinality;
-        }
-
-        if (useStreamingPreagg_) {
-          // A reduction factor of 2 (input rows divided by output rows) was
-          // added to grow hash tables. If the reduction factor is lower than 2,
-          // only part of the data will be inserted into the hash table.
-          perInstanceCardinality =
-              Math.min(perInstanceCardinality, perInstanceInputCardinality / 2);
-        } else {
-          perInstanceCardinality =
-              Math.min(perInstanceCardinality, perInstanceInputCardinality);
+      perInstanceDataBytes =
+          estimatePerInstanceDataBytes(perInstanceCardinality, inputCardinality);
+      if (perInstanceDataBytes > largeAggMemThreshold) {
+        // Should try to schedule agg node with lower memory estimation than
+        // current perInstanceDataBytes. This is fine since preagg node can passthrough
+        // row under memory pressure, while final agg node can spill to disk.
+        // Try to come up with lower memory requirement by using max(NDV) and
+        // AGG_MEM_CORRELATION_FACTOR to estimate a lower perInstanceCardinality rather
+        // than the default NDV multiplication method.
+        long lowPerInstanceCardinality =
+            fragment_.getPerInstanceNdv(aggInfo.getGroupingExprs(), true);
+        Preconditions.checkState(lowPerInstanceCardinality > -1);
+        long lowPerInstanceDataBytes = Math.max(largeAggMemThreshold,
+            estimatePerInstanceDataBytes(lowPerInstanceCardinality, inputCardinality));
+        Preconditions.checkState(lowPerInstanceDataBytes <= perInstanceDataBytes);
+
+        // Given N as number of grouping expressions,
+        // corrFactor = AGG_MEM_CORRELATION_FACTOR ^ N
+        double corrFactor = Math.pow(queryOptions.getAgg_mem_correlation_factor(),
+            aggInfo.getGroupingExprs().size());
+        long resolvedPerInstanceDataBytes = lowPerInstanceDataBytes
+            + Math.round(corrFactor * (perInstanceDataBytes - lowPerInstanceDataBytes));
+        if (LOG.isTraceEnabled() && perInstanceDataBytes > resolvedPerInstanceDataBytes) {
+          LOG.trace("Node " + getDisplayLabel() + " reduce perInstanceDataBytes from "
+              + perInstanceDataBytes + " to " + resolvedPerInstanceDataBytes);
         }
+        perInstanceDataBytes = resolvedPerInstanceDataBytes;
       }
-      // The memory of the data stored in hash table and the memory of the
-      // hash table‘s structure
-      perInstanceDataBytes = (long)Math.ceil(perInstanceCardinality *
-                                  (avgRowSize_ + PlannerContext.SIZE_OF_BUCKET));
       if (aggInfo.getGroupingExprs().isEmpty()) {
         perInstanceMemEstimate = MIN_PLAIN_AGG_MEM;
       } else {
-        perInstanceMemEstimate = (long)Math.max(perInstanceDataBytes, MIN_HASH_TBL_MEM);
+        perInstanceMemEstimate =
+            Math.max(perInstanceDataBytes, QueryConstants.MIN_HASH_TBL_MEM);
       }
     }
 
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
index 594455596..397c81b74 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
@@ -163,7 +163,8 @@ public class HdfsTableSink extends TableSink {
       // If the insert is clustered, it produces a single partition at a time.
       numBufferedPartitionsPerInstance = 1;
     } else {
-      numBufferedPartitionsPerInstance = fragment_.getPerInstanceNdv(partitionKeyExprs_);
+      numBufferedPartitionsPerInstance =
+          fragment_.getPerInstanceNdv(partitionKeyExprs_, false);
       if (numBufferedPartitionsPerInstance == -1) {
         numBufferedPartitionsPerInstance = DEFAULT_NUM_PARTITIONS;
       }
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
index ac43508cb..44b52f152 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
@@ -552,17 +552,23 @@ public class PlanFragment extends TreeNode<PlanFragment> {
   }
 
   /**
-    * data partition of this fragment, the number of nodes, and the degree of parallelism.
-    * Returns -1 for an invalid estimate, e.g., because getNumDistinctValues() failed on
-    * one of the exprs.
-    */
-  public long getPerInstanceNdv(List<Expr> exprs) {
+   * Estimates the number of distinct values of exprs per fragment instance based on the
+   * data partition of this fragment, the number of nodes, and the degree of parallelism.
+   * By default, it estimates the number of distinct values by using simple multiplication
+   * of each expr.getNumDistinctValues(), and divide it by numInstances if a partition
+   * column exist in an expr. However, if useMaxNdv is true, this method will return the
+   * minimum between the default estimation vs the maximum expr.getNumDistinctValues().
+   * Returns -1 for an invalid estimate, e.g., because getNumDistinctValues() failed on
+   * one of the exprs.
+   */
+  public long getPerInstanceNdv(List<Expr> exprs, boolean useMaxNdv) {
     Preconditions.checkNotNull(dataPartition_);
     long result = 1;
     int numInstances = getNumInstances();
     Preconditions.checkState(numInstances >= 0);
     // The number of nodes is zero for empty tables.
     if (numInstances == 0) return 0;
+    long maxNdv = 1;
     boolean partition = false;
     for (Expr expr: exprs) {
       long numDistinct = expr.getNumDistinctValues();
@@ -574,10 +580,12 @@ public class PlanFragment extends TreeNode<PlanFragment> {
         partition = true;
       }
       result = PlanNode.checkedMultiply(result, numDistinct);
+      maxNdv = Math.max(maxNdv, numDistinct);
     }
     if (partition) {
       result = (long)Math.max((double) result / (double) numInstances, 1L);
     }
+    if (useMaxNdv && result > maxNdv) result = maxNdv;
     return result;
   }
 
diff --git a/fe/src/test/java/org/apache/impala/planner/CardinalityTest.java b/fe/src/test/java/org/apache/impala/planner/CardinalityTest.java
index 317a36151..ae97fdef7 100644
--- a/fe/src/test/java/org/apache/impala/planner/CardinalityTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/CardinalityTest.java
@@ -27,6 +27,7 @@ import java.util.Set;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.service.Frontend.PlanCtx;
 import org.apache.impala.testutil.TestUtils;
+import org.apache.impala.thrift.QueryConstants;
 import org.apache.impala.thrift.TQueryCtx;
 import org.apache.impala.thrift.TQueryOptions;
 import org.junit.Test;
@@ -887,12 +888,14 @@ public class CardinalityTest extends PlannerTestBase {
     // Ndv of int_col is 10;
     // MIN_HASH_TBL_MEM is 10M
     verifyApproxMemoryEstimate("SELECT COUNT(int_col) FROM functional.alltypes "
-        + "GROUP BY int_col", AggregationNode.MIN_HASH_TBL_MEM, true, false,
-        ImmutableSet.of(), pathToFirstAggregationNode, AggregationNode.class);
+            + "GROUP BY int_col",
+        QueryConstants.MIN_HASH_TBL_MEM, true, false, ImmutableSet.of(),
+        pathToFirstAggregationNode, AggregationNode.class);
     // create a single node plan.
     verifyApproxMemoryEstimate("SELECT COUNT(int_col) FROM functional.alltypes "
-        + "GROUP BY int_col", AggregationNode.MIN_HASH_TBL_MEM, false, false,
-        ImmutableSet.of(), pathToAggregationNode, AggregationNode.class);
+            + "GROUP BY int_col",
+        QueryConstants.MIN_HASH_TBL_MEM, false, false, ImmutableSet.of(),
+        pathToAggregationNode, AggregationNode.class);
 
     // FUNCTIONAL.ALLTYPES.ID's Ndv is 7300 and avgRowSize is 4
     // FUNCTIONAL.ALLTYPESSMALL.TIMESTAMP_COL's Ndv is 100 and avgRowSize is 16
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index b8dbfa89d..497dcb8a7 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.impala.catalog.Catalog;
 import org.apache.impala.catalog.ColumnStats;
@@ -1386,18 +1387,12 @@ public class PlannerTest extends PlannerTestBase {
    */
   @Test
   public void testProcessingCost() {
-    TQueryOptions options = new TQueryOptions();
+    TQueryOptions options = tpcdsParquetQueryOptions();
     options.setCompute_processing_cost(true);
     options.setProcessing_cost_min_threads(2);
     options.setMax_fragment_instances_per_node(16);
-    options.setMinmax_filter_threshold(0.5);
-    options.setMinmax_filter_sorted_columns(false);
-    options.setMinmax_filter_partition_columns(false);
-    runPlannerTestFile("tpcds-processing-cost", "tpcds_parquet", options,
-        ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN,
-            PlannerTestOption.INCLUDE_RESOURCE_HEADER,
-            PlannerTestOption.VALIDATE_RESOURCES,
-            PlannerTestOption.VALIDATE_CARDINALITY));
+    runPlannerTestFile(
+        "tpcds-processing-cost", "tpcds_parquet", options, tpcdsParquetTestOptions());
   }
 
   /**
@@ -1408,4 +1403,23 @@ public class PlannerTest extends PlannerTestBase {
     runPlannerTestFile("predicate-selectivity-hint",
         ImmutableSet.of(PlannerTestOption.VALIDATE_CARDINALITY));
   }
+
+  /**
+   * Test that memory estimate of aggregation node equals to the default estimation
+   * (with NDV multiplication method) if AGG_MEM_CORRELATION_FACTOR=1.0 or
+   * LARGE_AGG_MEM_THRESHOLD is less than or equal to zero.
+   */
+  @Test
+  public void testAggNodeMaxMemEstimate() {
+    Set<PlannerTestOption> testOptions = tpcdsParquetTestOptions();
+    TQueryOptions options = tpcdsParquetQueryOptions();
+    double defaultAggMemCorrelationFactor = options.getAgg_mem_correlation_factor();
+    options.setAgg_mem_correlation_factor(1.0);
+    runPlannerTestFile(
+        "agg-node-max-mem-estimate", "tpcds_parquet", options, testOptions);
+
+    options.setLarge_agg_mem_threshold(0);
+    runPlannerTestFile(
+        "agg-node-max-mem-estimate", "tpcds_parquet", options, testOptions);
+  }
 }
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
index 154da24ea..7b1baee33 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
@@ -47,7 +47,6 @@ import org.apache.impala.testutil.TestFileParser.Section;
 import org.apache.impala.testutil.TestFileParser.TestCase;
 import org.apache.impala.testutil.TestUtils;
 import org.apache.impala.testutil.TestUtils.ResultFilter;
-import org.apache.impala.thrift.ImpalaInternalServiceConstants;
 import org.apache.impala.thrift.QueryConstants;
 import org.apache.impala.thrift.TDescriptorTable;
 import org.apache.impala.thrift.TExecRequest;
@@ -83,6 +82,7 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -400,6 +400,23 @@ public class PlannerTestBase extends FrontendTestBase {
     return options;
   }
 
+  protected static TQueryOptions tpcdsParquetQueryOptions() {
+    TQueryOptions options = new TQueryOptions();
+    /* Enable minmax overlap filter feature for tpcds Parquet tests. */
+    options.setMinmax_filter_threshold(0.5);
+    /* Disable minmax filter on sorted columns. */
+    options.setMinmax_filter_sorted_columns(false);
+    /* Disable minmax filter on partition columns. */
+    options.setMinmax_filter_partition_columns(false);
+    return options;
+  }
+
+  protected static Set<PlannerTestOption> tpcdsParquetTestOptions() {
+    return ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN,
+        PlannerTestOption.INCLUDE_RESOURCE_HEADER, PlannerTestOption.VALIDATE_RESOURCES,
+        PlannerTestOption.VALIDATE_CARDINALITY);
+  }
+
   /**
    * Produces single-node, distributed, and parallel plans for testCase and compares
    * plan and scan range results.
diff --git a/fe/src/test/java/org/apache/impala/planner/TpcdsPlannerTest.java b/fe/src/test/java/org/apache/impala/planner/TpcdsPlannerTest.java
index 14b9d3dd0..94d14a97b 100644
--- a/fe/src/test/java/org/apache/impala/planner/TpcdsPlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/TpcdsPlannerTest.java
@@ -24,8 +24,6 @@ import org.apache.impala.thrift.TQueryOptions;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import com.google.common.collect.ImmutableSet;
-
 
 /**
  * Plans from the TPC-DS qualification queries at scale factor 1. Single node,
@@ -33,24 +31,14 @@ import com.google.common.collect.ImmutableSet;
  * also preformed.
  */
 public class TpcdsPlannerTest extends PlannerTestBase {
+  private static Set<PlannerTestOption> testOptions = tpcdsParquetTestOptions();
 
-  private static Set<PlannerTestOption> testOptions =
-      ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN,
-          PlannerTestOption.INCLUDE_RESOURCE_HEADER, PlannerTestOption.VALIDATE_RESOURCES,
-          PlannerTestOption.VALIDATE_CARDINALITY);
-
-  private static TQueryOptions options = new TQueryOptions();
+  private static TQueryOptions options = tpcdsParquetQueryOptions();
 
   @BeforeClass
   public static void setUp() throws Exception {
     PlannerTestBase.setUp();
     Paths.get(outDir_.toString(), "tpcds").toFile().mkdirs();
-    /* Enable minmax overlap filter feature for tpcds Parquet tests. */
-    options.setMinmax_filter_threshold(0.5);
-    /* Disable minmax filter on sorted columns. */
-    options.setMinmax_filter_sorted_columns(false);
-    /* Disable minmax filter on partition columns. */
-    options.setMinmax_filter_partition_columns(false);
   }
 
   @Test
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q67.test b/testdata/workloads/functional-planner/queries/PlannerTest/agg-node-max-mem-estimate.test
similarity index 63%
copy from testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q67.test
copy to testdata/workloads/functional-planner/queries/PlannerTest/agg-node-max-mem-estimate.test
index 5de0877a8..8c2acd5da 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q67.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/agg-node-max-mem-estimate.test
@@ -1,3 +1,490 @@
+# TPCDS-Q22
+select  i_product_name
+             ,i_brand
+             ,i_class
+             ,i_category
+             ,avg(inv_quantity_on_hand) qoh
+       from inventory
+           ,date_dim
+           ,item
+           ,warehouse
+       where inv_date_sk=d_date_sk
+              and inv_item_sk=i_item_sk
+              and inv_warehouse_sk = w_warehouse_sk
+              and d_month_seq between 1212 and 1212 + 11
+       group by rollup(i_product_name
+                       ,i_brand
+                       ,i_class
+                       ,i_category)
+order by qoh, i_product_name, i_brand, i_class, i_category
+limit 100;
+---- PLAN
+Max Per-Host Resource Reservation: Memory=149.44MB Threads=5
+Per-Host Resource Estimates: Memory=16.63GB
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=16.63GB mem-reservation=149.44MB thread-reservation=5 runtime-filters-memory=3.00MB
+PLAN-ROOT SINK
+|  output exprs: CASE valid_tid(5,7,9,11,13) WHEN 5 THEN i_product_name WHEN 7 THEN i_product_name WHEN 9 THEN i_product_name WHEN 11 THEN i_product_name WHEN 13 THEN NULL END, CASE valid_tid(5,7,9,11,13) WHEN 5 THEN i_brand WHEN 7 THEN i_brand WHEN 9 THEN i_brand WHEN 11 THEN NULL WHEN 13 THEN NULL END, CASE valid_tid(5,7,9,11,13) WHEN 5 THEN i_class WHEN 7 THEN i_class WHEN 9 THEN NULL WHEN 11 THEN NULL WHEN 13 THEN NULL END, CASE valid_tid(5,7,9,11,13) WHEN 5 THEN i_category WHEN 7 TH [...]
+|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|
+09:TOP-N [LIMIT=100]
+|  order by: aggif(valid_tid(5,7,9,11,13) IN (5, 7, 9, 11, 13), CASE valid_tid(5,7,9,11,13) WHEN 5 THEN avg(inv_quantity_on_hand) WHEN 7 THEN avg(inv_quantity_on_hand) WHEN 9 THEN avg(inv_quantity_on_hand) WHEN 11 THEN avg(inv_quantity_on_hand) WHEN 13 THEN avg(inv_quantity_on_hand) END) ASC, CASE valid_tid(5,7,9,11,13) WHEN 5 THEN i_product_name WHEN 7 THEN i_product_name WHEN 9 THEN i_product_name WHEN 11 THEN i_product_name WHEN 13 THEN NULL END ASC, CASE valid_tid(5,7,9,11,13) WHEN 5 [...]
+|  mem-estimate=5.47KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=15 row-size=56B cardinality=100
+|  in pipelines: 09(GETNEXT), 08(OPEN)
+|
+08:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid(5,7,9,11,13) IN (CAST(5 AS INT), CAST(7 AS INT), CAST(9 AS INT), CAST(11 AS INT), CAST(13 AS INT)), CASE valid_tid(5,7,9,11,13) WHEN CAST(5 AS INT) THEN avg(inv_quantity_on_hand) WHEN CAST(7 AS INT) THEN avg(inv_quantity_on_hand) WHEN CAST(9 AS INT) THEN avg(inv_quantity_on_hand) WHEN CAST(11 AS INT) THEN avg(inv_quantity_on_hand) WHEN CAST(13 AS INT) THEN avg(inv_quantity_on_hand) END)
+|  group by: CASE valid_tid(5,7,9,11,13) WHEN CAST(5 AS INT) THEN i_product_name WHEN CAST(7 AS INT) THEN i_product_name WHEN CAST(9 AS INT) THEN i_product_name WHEN CAST(11 AS INT) THEN i_product_name WHEN CAST(13 AS INT) THEN NULL END, CASE valid_tid(5,7,9,11,13) WHEN CAST(5 AS INT) THEN i_brand WHEN CAST(7 AS INT) THEN i_brand WHEN CAST(9 AS INT) THEN i_brand WHEN CAST(11 AS INT) THEN NULL WHEN CAST(13 AS INT) THEN NULL END, CASE valid_tid(5,7,9,11,13) WHEN CAST(5 AS INT) THEN i_class [...]
+|  mem-estimate=2.36GB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=14 row-size=60B cardinality=35.25M
+|  in pipelines: 08(GETNEXT), 07(OPEN)
+|
+07:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: avg(CAST(inv_quantity_on_hand AS BIGINT))
+|    group by: i_product_name, i_brand, i_class, i_category
+|  Class 1
+|    output: avg(CAST(inv_quantity_on_hand AS BIGINT))
+|    group by: i_product_name, i_brand, i_class, NULL
+|  Class 2
+|    output: avg(CAST(inv_quantity_on_hand AS BIGINT))
+|    group by: i_product_name, i_brand, NULL, NULL
+|  Class 3
+|    output: avg(CAST(inv_quantity_on_hand AS BIGINT))
+|    group by: i_product_name, NULL, NULL, NULL
+|  Class 4
+|    output: avg(CAST(inv_quantity_on_hand AS BIGINT))
+|    group by: NULL, NULL, NULL, NULL
+|  mem-estimate=14.27GB mem-reservation=112.44MB thread-reservation=0
+|  tuple-ids=4N,6N,8N,10N,12N row-size=422B cardinality=35.25M
+|  in pipelines: 07(GETNEXT), 00(OPEN)
+|
+06:HASH JOIN [INNER JOIN]
+|  hash predicates: inv_warehouse_sk = w_warehouse_sk
+|  fk/pk conjuncts: inv_warehouse_sk = w_warehouse_sk
+|  runtime filters: RF000[bloom] <- w_warehouse_sk, RF001[min_max] <- w_warehouse_sk
+|  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=0,1,2,3 row-size=136B cardinality=11.74M
+|  in pipelines: 00(GETNEXT), 03(OPEN)
+|
+|--03:SCAN HDFS [tpcds_parquet.warehouse]
+|     HDFS partitions=1/1 files=1 size=4.38KB
+|     stored statistics:
+|       table: rows=5 size=4.38KB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=5
+|     mem-estimate=16.00MB mem-reservation=8.00KB thread-reservation=1
+|     tuple-ids=3 row-size=4B cardinality=5
+|     in pipelines: 03(GETNEXT)
+|
+05:HASH JOIN [INNER JOIN]
+|  hash predicates: inv_item_sk = i_item_sk
+|  fk/pk conjuncts: inv_item_sk = i_item_sk
+|  runtime filters: RF002[bloom] <- i_item_sk, RF003[min_max] <- i_item_sk
+|  mem-estimate=4.75MB mem-reservation=4.75MB spill-buffer=256.00KB thread-reservation=0
+|  tuple-ids=0,1,2 row-size=132B cardinality=11.74M
+|  in pipelines: 00(GETNEXT), 02(OPEN)
+|
+|--02:SCAN HDFS [tpcds_parquet.item]
+|     HDFS partitions=1/1 files=1 size=1.73MB
+|     stored statistics:
+|       table: rows=18.00K size=1.73MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=18.00K
+|     mem-estimate=80.00MB mem-reservation=1.00MB thread-reservation=1
+|     tuple-ids=2 row-size=104B cardinality=18.00K
+|     in pipelines: 02(GETNEXT)
+|
+04:HASH JOIN [INNER JOIN]
+|  hash predicates: inv_date_sk = d_date_sk
+|  fk/pk conjuncts: inv_date_sk = d_date_sk
+|  runtime filters: RF004[bloom] <- d_date_sk, RF005[min_max] <- d_date_sk
+|  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=0,1 row-size=28B cardinality=11.74M
+|  in pipelines: 00(GETNEXT), 01(OPEN)
+|
+|--01:SCAN HDFS [tpcds_parquet.date_dim]
+|     HDFS partitions=1/1 files=1 size=2.15MB
+|     predicates: d_month_seq <= CAST(1223 AS INT), d_month_seq >= CAST(1212 AS INT)
+|     stored statistics:
+|       table: rows=73.05K size=2.15MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=73.05K
+|     parquet statistics predicates: d_month_seq <= CAST(1223 AS INT), d_month_seq >= CAST(1212 AS INT)
+|     parquet dictionary predicates: d_month_seq <= CAST(1223 AS INT), d_month_seq >= CAST(1212 AS INT)
+|     mem-estimate=32.00MB mem-reservation=512.00KB thread-reservation=1
+|     tuple-ids=1 row-size=8B cardinality=7.30K
+|     in pipelines: 01(GETNEXT)
+|
+00:SCAN HDFS [tpcds_parquet.inventory]
+   HDFS partitions=1/1 files=2 size=34.09MB
+   runtime filters: RF001[min_max] -> inv_warehouse_sk, RF003[min_max] -> inv_item_sk, RF005[min_max] -> inv_date_sk, RF000[bloom] -> inv_warehouse_sk, RF002[bloom] -> inv_item_sk, RF004[bloom] -> inv_date_sk
+   stored statistics:
+     table: rows=11.74M size=34.09MB
+     columns: all
+   extrapolated-rows=disabled max-scan-range-rows=6.66M
+   mem-estimate=128.00MB mem-reservation=16.00MB thread-reservation=1
+   tuple-ids=0 row-size=20B cardinality=11.74M
+   in pipelines: 00(GETNEXT)
+---- DISTRIBUTEDPLAN
+Max Per-Host Resource Reservation: Memory=288.82MB Threads=10
+Per-Host Resource Estimates: Memory=13.96GB
+F05:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1
+PLAN-ROOT SINK
+|  output exprs: CASE valid_tid(5,7,9,11,13) WHEN 5 THEN i_product_name WHEN 7 THEN i_product_name WHEN 9 THEN i_product_name WHEN 11 THEN i_product_name WHEN 13 THEN NULL END, CASE valid_tid(5,7,9,11,13) WHEN 5 THEN i_brand WHEN 7 THEN i_brand WHEN 9 THEN i_brand WHEN 11 THEN NULL WHEN 13 THEN NULL END, CASE valid_tid(5,7,9,11,13) WHEN 5 THEN i_class WHEN 7 THEN i_class WHEN 9 THEN NULL WHEN 11 THEN NULL WHEN 13 THEN NULL END, CASE valid_tid(5,7,9,11,13) WHEN 5 THEN i_category WHEN 7 TH [...]
+|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|
+15:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: aggif(valid_tid(5,7,9,11,13) IN (5, 7, 9, 11, 13), CASE valid_tid(5,7,9,11,13) WHEN 5 THEN avg(inv_quantity_on_hand) WHEN 7 THEN avg(inv_quantity_on_hand) WHEN 9 THEN avg(inv_quantity_on_hand) WHEN 11 THEN avg(inv_quantity_on_hand) WHEN 13 THEN avg(inv_quantity_on_hand) END) ASC, CASE valid_tid(5,7,9,11,13) WHEN 5 THEN i_product_name WHEN 7 THEN i_product_name WHEN 9 THEN i_product_name WHEN 11 THEN i_product_name WHEN 13 THEN NULL END ASC, CASE valid_tid(5,7,9,11,13) WHEN 5 [...]
+|  limit: 100
+|  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=15 row-size=56B cardinality=100
+|  in pipelines: 09(GETNEXT)
+|
+F04:PLAN FRAGMENT [HASH(CASE valid_tid(4,6,8,10,12) WHEN 4 THEN murmur_hash(i_product_name) WHEN 6 THEN murmur_hash(i_product_name) WHEN 8 THEN murmur_hash(i_product_name) WHEN 10 THEN murmur_hash(i_product_name) WHEN 12 THEN murmur_hash(NULL) END,CASE valid_tid(4,6,8,10,12) WHEN 4 THEN murmur_hash(i_brand) WHEN 6 THEN murmur_hash(i_brand) WHEN 8 THEN murmur_hash(i_brand) WHEN 10 THEN murmur_hash(NULL) WHEN 12 THEN murmur_hash(NULL) END,CASE valid_tid(4,6,8,10,12) WHEN 4 THEN murmur_hash [...]
+Per-Host Resources: mem-estimate=8.33GB mem-reservation=142.69MB thread-reservation=1
+09:TOP-N [LIMIT=100]
+|  order by: aggif(valid_tid(5,7,9,11,13) IN (5, 7, 9, 11, 13), CASE valid_tid(5,7,9,11,13) WHEN 5 THEN avg(inv_quantity_on_hand) WHEN 7 THEN avg(inv_quantity_on_hand) WHEN 9 THEN avg(inv_quantity_on_hand) WHEN 11 THEN avg(inv_quantity_on_hand) WHEN 13 THEN avg(inv_quantity_on_hand) END) ASC, CASE valid_tid(5,7,9,11,13) WHEN 5 THEN i_product_name WHEN 7 THEN i_product_name WHEN 9 THEN i_product_name WHEN 11 THEN i_product_name WHEN 13 THEN NULL END ASC, CASE valid_tid(5,7,9,11,13) WHEN 5 [...]
+|  mem-estimate=5.47KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=15 row-size=56B cardinality=100
+|  in pipelines: 09(GETNEXT), 08(OPEN)
+|
+08:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid(5,7,9,11,13) IN (CAST(5 AS INT), CAST(7 AS INT), CAST(9 AS INT), CAST(11 AS INT), CAST(13 AS INT)), CASE valid_tid(5,7,9,11,13) WHEN CAST(5 AS INT) THEN avg(inv_quantity_on_hand) WHEN CAST(7 AS INT) THEN avg(inv_quantity_on_hand) WHEN CAST(9 AS INT) THEN avg(inv_quantity_on_hand) WHEN CAST(11 AS INT) THEN avg(inv_quantity_on_hand) WHEN CAST(13 AS INT) THEN avg(inv_quantity_on_hand) END)
+|  group by: CASE valid_tid(5,7,9,11,13) WHEN CAST(5 AS INT) THEN i_product_name WHEN CAST(7 AS INT) THEN i_product_name WHEN CAST(9 AS INT) THEN i_product_name WHEN CAST(11 AS INT) THEN i_product_name WHEN CAST(13 AS INT) THEN NULL END, CASE valid_tid(5,7,9,11,13) WHEN CAST(5 AS INT) THEN i_brand WHEN CAST(7 AS INT) THEN i_brand WHEN CAST(9 AS INT) THEN i_brand WHEN CAST(11 AS INT) THEN NULL WHEN CAST(13 AS INT) THEN NULL END, CASE valid_tid(5,7,9,11,13) WHEN CAST(5 AS INT) THEN i_class [...]
+|  mem-estimate=1.18GB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=14 row-size=60B cardinality=35.25M
+|  in pipelines: 08(GETNEXT), 14(OPEN)
+|
+14:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: avg:merge(inv_quantity_on_hand)
+|    group by: i_product_name, i_brand, i_class, i_category
+|  Class 1
+|    output: avg:merge(inv_quantity_on_hand)
+|    group by: i_product_name, i_brand, i_class, NULL
+|  Class 2
+|    output: avg:merge(inv_quantity_on_hand)
+|    group by: i_product_name, i_brand, NULL, NULL
+|  Class 3
+|    output: avg:merge(inv_quantity_on_hand)
+|    group by: i_product_name, NULL, NULL, NULL
+|  Class 4
+|    output: avg:merge(inv_quantity_on_hand)
+|    group by: NULL, NULL, NULL, NULL
+|  mem-estimate=7.14GB mem-reservation=108.69MB thread-reservation=0
+|  tuple-ids=5N,7N,9N,11N,13N row-size=422B cardinality=35.25M
+|  in pipelines: 14(GETNEXT), 00(OPEN)
+|
+13:EXCHANGE [HASH(CASE valid_tid(4,6,8,10,12) WHEN 4 THEN murmur_hash(i_product_name) WHEN 6 THEN murmur_hash(i_product_name) WHEN 8 THEN murmur_hash(i_product_name) WHEN 10 THEN murmur_hash(i_product_name) WHEN 12 THEN murmur_hash(NULL) END,CASE valid_tid(4,6,8,10,12) WHEN 4 THEN murmur_hash(i_brand) WHEN 6 THEN murmur_hash(i_brand) WHEN 8 THEN murmur_hash(i_brand) WHEN 10 THEN murmur_hash(NULL) WHEN 12 THEN murmur_hash(NULL) END,CASE valid_tid(4,6,8,10,12) WHEN 4 THEN murmur_hash(i_cla [...]
+|  mem-estimate=10.86MB mem-reservation=0B thread-reservation=0
+|  tuple-ids=4N,6N,8N,10N,12N row-size=422B cardinality=35.25M
+|  in pipelines: 00(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
+Per-Host Resources: mem-estimate=5.50GB mem-reservation=140.62MB thread-reservation=2 runtime-filters-memory=3.00MB
+07:AGGREGATE [STREAMING]
+|  Class 0
+|    output: avg(CAST(inv_quantity_on_hand AS BIGINT))
+|    group by: i_product_name, i_brand, i_class, i_category
+|  Class 1
+|    output: avg(CAST(inv_quantity_on_hand AS BIGINT))
+|    group by: i_product_name, i_brand, i_class, NULL
+|  Class 2
+|    output: avg(CAST(inv_quantity_on_hand AS BIGINT))
+|    group by: i_product_name, i_brand, NULL, NULL
+|  Class 3
+|    output: avg(CAST(inv_quantity_on_hand AS BIGINT))
+|    group by: i_product_name, NULL, NULL, NULL
+|  Class 4
+|    output: avg(CAST(inv_quantity_on_hand AS BIGINT))
+|    group by: NULL, NULL, NULL, NULL
+|  mem-estimate=5.36GB mem-reservation=113.00MB thread-reservation=0
+|  tuple-ids=4N,6N,8N,10N,12N row-size=422B cardinality=35.25M
+|  in pipelines: 00(GETNEXT)
+|
+06:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: inv_warehouse_sk = w_warehouse_sk
+|  fk/pk conjuncts: inv_warehouse_sk = w_warehouse_sk
+|  runtime filters: RF000[bloom] <- w_warehouse_sk, RF001[min_max] <- w_warehouse_sk
+|  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=0,1,2,3 row-size=136B cardinality=11.74M
+|  in pipelines: 00(GETNEXT), 03(OPEN)
+|
+|--12:EXCHANGE [BROADCAST]
+|  |  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=3 row-size=4B cardinality=5
+|  |  in pipelines: 03(GETNEXT)
+|  |
+|  F03:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=16.03MB mem-reservation=8.00KB thread-reservation=2
+|  03:SCAN HDFS [tpcds_parquet.warehouse, RANDOM]
+|     HDFS partitions=1/1 files=1 size=4.38KB
+|     stored statistics:
+|       table: rows=5 size=4.38KB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=5
+|     mem-estimate=16.00MB mem-reservation=8.00KB thread-reservation=1
+|     tuple-ids=3 row-size=4B cardinality=5
+|     in pipelines: 03(GETNEXT)
+|
+05:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: inv_item_sk = i_item_sk
+|  fk/pk conjuncts: inv_item_sk = i_item_sk
+|  runtime filters: RF002[bloom] <- i_item_sk, RF003[min_max] <- i_item_sk
+|  mem-estimate=4.75MB mem-reservation=4.75MB spill-buffer=256.00KB thread-reservation=0
+|  tuple-ids=0,1,2 row-size=132B cardinality=11.74M
+|  in pipelines: 00(GETNEXT), 02(OPEN)
+|
+|--11:EXCHANGE [BROADCAST]
+|  |  mem-estimate=1.89MB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=2 row-size=104B cardinality=18.00K
+|  |  in pipelines: 02(GETNEXT)
+|  |
+|  F02:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=80.42MB mem-reservation=1.00MB thread-reservation=2
+|  02:SCAN HDFS [tpcds_parquet.item, RANDOM]
+|     HDFS partitions=1/1 files=1 size=1.73MB
+|     stored statistics:
+|       table: rows=18.00K size=1.73MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=18.00K
+|     mem-estimate=80.00MB mem-reservation=1.00MB thread-reservation=1
+|     tuple-ids=2 row-size=104B cardinality=18.00K
+|     in pipelines: 02(GETNEXT)
+|
+04:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: inv_date_sk = d_date_sk
+|  fk/pk conjuncts: inv_date_sk = d_date_sk
+|  runtime filters: RF004[bloom] <- d_date_sk, RF005[min_max] <- d_date_sk
+|  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=0,1 row-size=28B cardinality=11.74M
+|  in pipelines: 00(GETNEXT), 01(OPEN)
+|
+|--10:EXCHANGE [BROADCAST]
+|  |  mem-estimate=69.07KB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=1 row-size=8B cardinality=7.30K
+|  |  in pipelines: 01(GETNEXT)
+|  |
+|  F01:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=32.05MB mem-reservation=512.00KB thread-reservation=2
+|  01:SCAN HDFS [tpcds_parquet.date_dim, RANDOM]
+|     HDFS partitions=1/1 files=1 size=2.15MB
+|     predicates: d_month_seq <= CAST(1223 AS INT), d_month_seq >= CAST(1212 AS INT)
+|     stored statistics:
+|       table: rows=73.05K size=2.15MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=73.05K
+|     parquet statistics predicates: d_month_seq <= CAST(1223 AS INT), d_month_seq >= CAST(1212 AS INT)
+|     parquet dictionary predicates: d_month_seq <= CAST(1223 AS INT), d_month_seq >= CAST(1212 AS INT)
+|     mem-estimate=32.00MB mem-reservation=512.00KB thread-reservation=1
+|     tuple-ids=1 row-size=8B cardinality=7.30K
+|     in pipelines: 01(GETNEXT)
+|
+00:SCAN HDFS [tpcds_parquet.inventory, RANDOM]
+   HDFS partitions=1/1 files=2 size=34.09MB
+   runtime filters: RF001[min_max] -> inv_warehouse_sk, RF003[min_max] -> inv_item_sk, RF005[min_max] -> inv_date_sk, RF000[bloom] -> inv_warehouse_sk, RF002[bloom] -> inv_item_sk, RF004[bloom] -> inv_date_sk
+   stored statistics:
+     table: rows=11.74M size=34.09MB
+     columns: all
+   extrapolated-rows=disabled max-scan-range-rows=6.66M
+   mem-estimate=128.00MB mem-reservation=16.00MB thread-reservation=1
+   tuple-ids=0 row-size=20B cardinality=11.74M
+   in pipelines: 00(GETNEXT)
+---- PARALLELPLANS
+Max Per-Host Resource Reservation: Memory=300.45MB Threads=9
+Per-Host Resource Estimates: Memory=13.80GB
+F05:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Instance Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1
+PLAN-ROOT SINK
+|  output exprs: CASE valid_tid(5,7,9,11,13) WHEN 5 THEN i_product_name WHEN 7 THEN i_product_name WHEN 9 THEN i_product_name WHEN 11 THEN i_product_name WHEN 13 THEN NULL END, CASE valid_tid(5,7,9,11,13) WHEN 5 THEN i_brand WHEN 7 THEN i_brand WHEN 9 THEN i_brand WHEN 11 THEN NULL WHEN 13 THEN NULL END, CASE valid_tid(5,7,9,11,13) WHEN 5 THEN i_class WHEN 7 THEN i_class WHEN 9 THEN NULL WHEN 11 THEN NULL WHEN 13 THEN NULL END, CASE valid_tid(5,7,9,11,13) WHEN 5 THEN i_category WHEN 7 TH [...]
+|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|
+15:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: aggif(valid_tid(5,7,9,11,13) IN (5, 7, 9, 11, 13), CASE valid_tid(5,7,9,11,13) WHEN 5 THEN avg(inv_quantity_on_hand) WHEN 7 THEN avg(inv_quantity_on_hand) WHEN 9 THEN avg(inv_quantity_on_hand) WHEN 11 THEN avg(inv_quantity_on_hand) WHEN 13 THEN avg(inv_quantity_on_hand) END) ASC, CASE valid_tid(5,7,9,11,13) WHEN 5 THEN i_product_name WHEN 7 THEN i_product_name WHEN 9 THEN i_product_name WHEN 11 THEN i_product_name WHEN 13 THEN NULL END ASC, CASE valid_tid(5,7,9,11,13) WHEN 5 [...]
+|  limit: 100
+|  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=15 row-size=56B cardinality=100
+|  in pipelines: 09(GETNEXT)
+|
+F04:PLAN FRAGMENT [HASH(CASE valid_tid(4,6,8,10,12) WHEN 4 THEN murmur_hash(i_product_name) WHEN 6 THEN murmur_hash(i_product_name) WHEN 8 THEN murmur_hash(i_product_name) WHEN 10 THEN murmur_hash(i_product_name) WHEN 12 THEN murmur_hash(NULL) END,CASE valid_tid(4,6,8,10,12) WHEN 4 THEN murmur_hash(i_brand) WHEN 6 THEN murmur_hash(i_brand) WHEN 8 THEN murmur_hash(i_brand) WHEN 10 THEN murmur_hash(NULL) WHEN 12 THEN murmur_hash(NULL) END,CASE valid_tid(4,6,8,10,12) WHEN 4 THEN murmur_hash [...]
+Per-Instance Resources: mem-estimate=8.33GB mem-reservation=142.69MB thread-reservation=1
+09:TOP-N [LIMIT=100]
+|  order by: aggif(valid_tid(5,7,9,11,13) IN (5, 7, 9, 11, 13), CASE valid_tid(5,7,9,11,13) WHEN 5 THEN avg(inv_quantity_on_hand) WHEN 7 THEN avg(inv_quantity_on_hand) WHEN 9 THEN avg(inv_quantity_on_hand) WHEN 11 THEN avg(inv_quantity_on_hand) WHEN 13 THEN avg(inv_quantity_on_hand) END) ASC, CASE valid_tid(5,7,9,11,13) WHEN 5 THEN i_product_name WHEN 7 THEN i_product_name WHEN 9 THEN i_product_name WHEN 11 THEN i_product_name WHEN 13 THEN NULL END ASC, CASE valid_tid(5,7,9,11,13) WHEN 5 [...]
+|  mem-estimate=5.47KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=15 row-size=56B cardinality=100
+|  in pipelines: 09(GETNEXT), 08(OPEN)
+|
+08:AGGREGATE [FINALIZE]
+|  output: aggif(valid_tid(5,7,9,11,13) IN (CAST(5 AS INT), CAST(7 AS INT), CAST(9 AS INT), CAST(11 AS INT), CAST(13 AS INT)), CASE valid_tid(5,7,9,11,13) WHEN CAST(5 AS INT) THEN avg(inv_quantity_on_hand) WHEN CAST(7 AS INT) THEN avg(inv_quantity_on_hand) WHEN CAST(9 AS INT) THEN avg(inv_quantity_on_hand) WHEN CAST(11 AS INT) THEN avg(inv_quantity_on_hand) WHEN CAST(13 AS INT) THEN avg(inv_quantity_on_hand) END)
+|  group by: CASE valid_tid(5,7,9,11,13) WHEN CAST(5 AS INT) THEN i_product_name WHEN CAST(7 AS INT) THEN i_product_name WHEN CAST(9 AS INT) THEN i_product_name WHEN CAST(11 AS INT) THEN i_product_name WHEN CAST(13 AS INT) THEN NULL END, CASE valid_tid(5,7,9,11,13) WHEN CAST(5 AS INT) THEN i_brand WHEN CAST(7 AS INT) THEN i_brand WHEN CAST(9 AS INT) THEN i_brand WHEN CAST(11 AS INT) THEN NULL WHEN CAST(13 AS INT) THEN NULL END, CASE valid_tid(5,7,9,11,13) WHEN CAST(5 AS INT) THEN i_class [...]
+|  mem-estimate=1.18GB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=14 row-size=60B cardinality=35.25M
+|  in pipelines: 08(GETNEXT), 14(OPEN)
+|
+14:AGGREGATE [FINALIZE]
+|  Class 0
+|    output: avg:merge(inv_quantity_on_hand)
+|    group by: i_product_name, i_brand, i_class, i_category
+|  Class 1
+|    output: avg:merge(inv_quantity_on_hand)
+|    group by: i_product_name, i_brand, i_class, NULL
+|  Class 2
+|    output: avg:merge(inv_quantity_on_hand)
+|    group by: i_product_name, i_brand, NULL, NULL
+|  Class 3
+|    output: avg:merge(inv_quantity_on_hand)
+|    group by: i_product_name, NULL, NULL, NULL
+|  Class 4
+|    output: avg:merge(inv_quantity_on_hand)
+|    group by: NULL, NULL, NULL, NULL
+|  mem-estimate=7.14GB mem-reservation=108.69MB thread-reservation=0
+|  tuple-ids=5N,7N,9N,11N,13N row-size=422B cardinality=35.25M
+|  in pipelines: 14(GETNEXT), 00(OPEN)
+|
+13:EXCHANGE [HASH(CASE valid_tid(4,6,8,10,12) WHEN 4 THEN murmur_hash(i_product_name) WHEN 6 THEN murmur_hash(i_product_name) WHEN 8 THEN murmur_hash(i_product_name) WHEN 10 THEN murmur_hash(i_product_name) WHEN 12 THEN murmur_hash(NULL) END,CASE valid_tid(4,6,8,10,12) WHEN 4 THEN murmur_hash(i_brand) WHEN 6 THEN murmur_hash(i_brand) WHEN 8 THEN murmur_hash(i_brand) WHEN 10 THEN murmur_hash(NULL) WHEN 12 THEN murmur_hash(NULL) END,CASE valid_tid(4,6,8,10,12) WHEN 4 THEN murmur_hash(i_cla [...]
+|  mem-estimate=10.86MB mem-reservation=0B thread-reservation=0
+|  tuple-ids=4N,6N,8N,10N,12N row-size=422B cardinality=35.25M
+|  in pipelines: 00(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
+Per-Host Shared Resources: mem-estimate=3.00MB mem-reservation=3.00MB thread-reservation=0 runtime-filters-memory=3.00MB
+Per-Instance Resources: mem-estimate=5.40GB mem-reservation=129.00MB thread-reservation=1
+07:AGGREGATE [STREAMING]
+|  Class 0
+|    output: avg(CAST(inv_quantity_on_hand AS BIGINT))
+|    group by: i_product_name, i_brand, i_class, i_category
+|  Class 1
+|    output: avg(CAST(inv_quantity_on_hand AS BIGINT))
+|    group by: i_product_name, i_brand, i_class, NULL
+|  Class 2
+|    output: avg(CAST(inv_quantity_on_hand AS BIGINT))
+|    group by: i_product_name, i_brand, NULL, NULL
+|  Class 3
+|    output: avg(CAST(inv_quantity_on_hand AS BIGINT))
+|    group by: i_product_name, NULL, NULL, NULL
+|  Class 4
+|    output: avg(CAST(inv_quantity_on_hand AS BIGINT))
+|    group by: NULL, NULL, NULL, NULL
+|  mem-estimate=5.36GB mem-reservation=113.00MB thread-reservation=0
+|  tuple-ids=4N,6N,8N,10N,12N row-size=422B cardinality=35.25M
+|  in pipelines: 00(GETNEXT)
+|
+06:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash-table-id=00
+|  hash predicates: inv_warehouse_sk = w_warehouse_sk
+|  fk/pk conjuncts: inv_warehouse_sk = w_warehouse_sk
+|  mem-estimate=0B mem-reservation=0B spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=0,1,2,3 row-size=136B cardinality=11.74M
+|  in pipelines: 00(GETNEXT), 03(OPEN)
+|
+|--F06:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
+|  |  Per-Instance Resources: mem-estimate=4.89MB mem-reservation=4.88MB thread-reservation=1 runtime-filters-memory=1.00MB
+|  JOIN BUILD
+|  |  join-table-id=00 plan-id=01 cohort-id=01
+|  |  build expressions: w_warehouse_sk
+|  |  runtime filters: RF000[bloom] <- w_warehouse_sk, RF001[min_max] <- w_warehouse_sk
+|  |  mem-estimate=3.88MB mem-reservation=3.88MB spill-buffer=64.00KB thread-reservation=0
+|  |
+|  12:EXCHANGE [BROADCAST]
+|  |  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=3 row-size=4B cardinality=5
+|  |  in pipelines: 03(GETNEXT)
+|  |
+|  F03:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+|  Per-Instance Resources: mem-estimate=16.03MB mem-reservation=8.00KB thread-reservation=1
+|  03:SCAN HDFS [tpcds_parquet.warehouse, RANDOM]
+|     HDFS partitions=1/1 files=1 size=4.38KB
+|     stored statistics:
+|       table: rows=5 size=4.38KB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=5
+|     mem-estimate=16.00MB mem-reservation=8.00KB thread-reservation=0
+|     tuple-ids=3 row-size=4B cardinality=5
+|     in pipelines: 03(GETNEXT)
+|
+05:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash-table-id=01
+|  hash predicates: inv_item_sk = i_item_sk
+|  fk/pk conjuncts: inv_item_sk = i_item_sk
+|  mem-estimate=0B mem-reservation=0B spill-buffer=256.00KB thread-reservation=0
+|  tuple-ids=0,1,2 row-size=132B cardinality=11.74M
+|  in pipelines: 00(GETNEXT), 02(OPEN)
+|
+|--F07:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
+|  |  Per-Instance Resources: mem-estimate=12.39MB mem-reservation=10.50MB thread-reservation=1 runtime-filters-memory=1.00MB
+|  JOIN BUILD
+|  |  join-table-id=01 plan-id=02 cohort-id=01
+|  |  build expressions: i_item_sk
+|  |  runtime filters: RF002[bloom] <- i_item_sk, RF003[min_max] <- i_item_sk
+|  |  mem-estimate=9.50MB mem-reservation=9.50MB spill-buffer=256.00KB thread-reservation=0
+|  |
+|  11:EXCHANGE [BROADCAST]
+|  |  mem-estimate=1.89MB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=2 row-size=104B cardinality=18.00K
+|  |  in pipelines: 02(GETNEXT)
+|  |
+|  F02:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+|  Per-Instance Resources: mem-estimate=16.42MB mem-reservation=1.00MB thread-reservation=1
+|  02:SCAN HDFS [tpcds_parquet.item, RANDOM]
+|     HDFS partitions=1/1 files=1 size=1.73MB
+|     stored statistics:
+|       table: rows=18.00K size=1.73MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=18.00K
+|     mem-estimate=16.00MB mem-reservation=1.00MB thread-reservation=0
+|     tuple-ids=2 row-size=104B cardinality=18.00K
+|     in pipelines: 02(GETNEXT)
+|
+04:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash-table-id=02
+|  hash predicates: inv_date_sk = d_date_sk
+|  fk/pk conjuncts: inv_date_sk = d_date_sk
+|  mem-estimate=0B mem-reservation=0B spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=0,1 row-size=28B cardinality=11.74M
+|  in pipelines: 00(GETNEXT), 01(OPEN)
+|
+|--F08:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
+|  |  Per-Instance Resources: mem-estimate=4.94MB mem-reservation=4.88MB thread-reservation=1 runtime-filters-memory=1.00MB
+|  JOIN BUILD
+|  |  join-table-id=02 plan-id=03 cohort-id=01
+|  |  build expressions: d_date_sk
+|  |  runtime filters: RF004[bloom] <- d_date_sk, RF005[min_max] <- d_date_sk
+|  |  mem-estimate=3.88MB mem-reservation=3.88MB spill-buffer=64.00KB thread-reservation=0
+|  |
+|  10:EXCHANGE [BROADCAST]
+|  |  mem-estimate=69.07KB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=1 row-size=8B cardinality=7.30K
+|  |  in pipelines: 01(GETNEXT)
+|  |
+|  F01:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+|  Per-Instance Resources: mem-estimate=16.05MB mem-reservation=512.00KB thread-reservation=1
+|  01:SCAN HDFS [tpcds_parquet.date_dim, RANDOM]
+|     HDFS partitions=1/1 files=1 size=2.15MB
+|     predicates: d_month_seq <= CAST(1223 AS INT), d_month_seq >= CAST(1212 AS INT)
+|     stored statistics:
+|       table: rows=73.05K size=2.15MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=73.05K
+|     parquet statistics predicates: d_month_seq <= CAST(1223 AS INT), d_month_seq >= CAST(1212 AS INT)
+|     parquet dictionary predicates: d_month_seq <= CAST(1223 AS INT), d_month_seq >= CAST(1212 AS INT)
+|     mem-estimate=16.00MB mem-reservation=512.00KB thread-reservation=0
+|     tuple-ids=1 row-size=8B cardinality=7.30K
+|     in pipelines: 01(GETNEXT)
+|
+00:SCAN HDFS [tpcds_parquet.inventory, RANDOM]
+   HDFS partitions=1/1 files=2 size=34.09MB
+   runtime filters: RF001[min_max] -> inv_warehouse_sk, RF003[min_max] -> inv_item_sk, RF005[min_max] -> inv_date_sk, RF000[bloom] -> inv_warehouse_sk, RF002[bloom] -> inv_item_sk, RF004[bloom] -> inv_date_sk
+   stored statistics:
+     table: rows=11.74M size=34.09MB
+     columns: all
+   extrapolated-rows=disabled max-scan-range-rows=6.66M
+   mem-estimate=32.00MB mem-reservation=16.00MB thread-reservation=0
+   tuple-ids=0 row-size=20B cardinality=11.74M
+   in pipelines: 00(GETNEXT)
+====
 # TPCDS-Q67
 select  *
 from (select i_category
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
index b7abe2566..ee00ac9c4 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
@@ -5892,12 +5892,12 @@ from tpch_parquet.lineitem
 limit 5
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=74.00MB Threads=2
-Per-Host Resource Estimates: Memory=1.44GB
+Per-Host Resource Estimates: Memory=1.13GB
 Analyzed query: SELECT DISTINCT * FROM tpch_parquet.lineitem LIMIT CAST(5 AS
 TINYINT)
 
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=1.44GB mem-reservation=74.00MB thread-reservation=2
+|  Per-Host Resources: mem-estimate=1.13GB mem-reservation=74.00MB thread-reservation=2
 PLAN-ROOT SINK
 |  output exprs: tpch_parquet.lineitem.l_orderkey, tpch_parquet.lineitem.l_partkey, tpch_parquet.lineitem.l_suppkey, tpch_parquet.lineitem.l_linenumber, tpch_parquet.lineitem.l_quantity, tpch_parquet.lineitem.l_extendedprice, tpch_parquet.lineitem.l_discount, tpch_parquet.lineitem.l_tax, tpch_parquet.lineitem.l_returnflag, tpch_parquet.lineitem.l_linestatus, tpch_parquet.lineitem.l_shipdate, tpch_parquet.lineitem.l_commitdate, tpch_parquet.lineitem.l_receiptdate, tpch_parquet.lineitem.l_ [...]
 |  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
@@ -5905,7 +5905,7 @@ PLAN-ROOT SINK
 01:AGGREGATE [FINALIZE]
 |  group by: tpch_parquet.lineitem.l_orderkey, tpch_parquet.lineitem.l_partkey, tpch_parquet.lineitem.l_suppkey, tpch_parquet.lineitem.l_linenumber, tpch_parquet.lineitem.l_quantity, tpch_parquet.lineitem.l_extendedprice, tpch_parquet.lineitem.l_discount, tpch_parquet.lineitem.l_tax, tpch_parquet.lineitem.l_returnflag, tpch_parquet.lineitem.l_linestatus, tpch_parquet.lineitem.l_shipdate, tpch_parquet.lineitem.l_commitdate, tpch_parquet.lineitem.l_receiptdate, tpch_parquet.lineitem.l_ship [...]
 |  limit: 5
-|  mem-estimate=1.36GB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=1.05GB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=1 row-size=231B cardinality=5
 |  in pipelines: 01(GETNEXT), 00(OPEN)
 |
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q22.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q22.test
index 1288f9826..264f6aeb9 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q22.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q22.test
@@ -20,9 +20,9 @@ order by qoh, i_product_name, i_brand, i_class, i_category
 limit 100;
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=149.44MB Threads=5
-Per-Host Resource Estimates: Memory=16.63GB
+Per-Host Resource Estimates: Memory=2.88GB
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=16.63GB mem-reservation=149.44MB thread-reservation=5 runtime-filters-memory=3.00MB
+|  Per-Host Resources: mem-estimate=2.88GB mem-reservation=149.44MB thread-reservation=5 runtime-filters-memory=3.00MB
 PLAN-ROOT SINK
 |  output exprs: CASE valid_tid(5,7,9,11,13) WHEN 5 THEN i_product_name WHEN 7 THEN i_product_name WHEN 9 THEN i_product_name WHEN 11 THEN i_product_name WHEN 13 THEN NULL END, CASE valid_tid(5,7,9,11,13) WHEN 5 THEN i_brand WHEN 7 THEN i_brand WHEN 9 THEN i_brand WHEN 11 THEN NULL WHEN 13 THEN NULL END, CASE valid_tid(5,7,9,11,13) WHEN 5 THEN i_class WHEN 7 THEN i_class WHEN 9 THEN NULL WHEN 11 THEN NULL WHEN 13 THEN NULL END, CASE valid_tid(5,7,9,11,13) WHEN 5 THEN i_category WHEN 7 TH [...]
 |  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
@@ -36,7 +36,7 @@ PLAN-ROOT SINK
 08:AGGREGATE [FINALIZE]
 |  output: aggif(valid_tid(5,7,9,11,13) IN (CAST(5 AS INT), CAST(7 AS INT), CAST(9 AS INT), CAST(11 AS INT), CAST(13 AS INT)), CASE valid_tid(5,7,9,11,13) WHEN CAST(5 AS INT) THEN avg(inv_quantity_on_hand) WHEN CAST(7 AS INT) THEN avg(inv_quantity_on_hand) WHEN CAST(9 AS INT) THEN avg(inv_quantity_on_hand) WHEN CAST(11 AS INT) THEN avg(inv_quantity_on_hand) WHEN CAST(13 AS INT) THEN avg(inv_quantity_on_hand) END)
 |  group by: CASE valid_tid(5,7,9,11,13) WHEN CAST(5 AS INT) THEN i_product_name WHEN CAST(7 AS INT) THEN i_product_name WHEN CAST(9 AS INT) THEN i_product_name WHEN CAST(11 AS INT) THEN i_product_name WHEN CAST(13 AS INT) THEN NULL END, CASE valid_tid(5,7,9,11,13) WHEN CAST(5 AS INT) THEN i_brand WHEN CAST(7 AS INT) THEN i_brand WHEN CAST(9 AS INT) THEN i_brand WHEN CAST(11 AS INT) THEN NULL WHEN CAST(13 AS INT) THEN NULL END, CASE valid_tid(5,7,9,11,13) WHEN CAST(5 AS INT) THEN i_class [...]
-|  mem-estimate=2.36GB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=571.64MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=14 row-size=60B cardinality=35.25M
 |  in pipelines: 08(GETNEXT), 07(OPEN)
 |
@@ -56,7 +56,7 @@ PLAN-ROOT SINK
 |  Class 4
 |    output: avg(CAST(inv_quantity_on_hand AS BIGINT))
 |    group by: NULL, NULL, NULL, NULL
-|  mem-estimate=14.27GB mem-reservation=112.44MB thread-reservation=0
+|  mem-estimate=2.32GB mem-reservation=112.44MB thread-reservation=0
 |  tuple-ids=4N,6N,8N,10N,12N row-size=422B cardinality=35.25M
 |  in pipelines: 07(GETNEXT), 00(OPEN)
 |
@@ -129,7 +129,7 @@ PLAN-ROOT SINK
    in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=288.82MB Threads=10
-Per-Host Resource Estimates: Memory=13.96GB
+Per-Host Resource Estimates: Memory=4.42GB
 F05:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1
 PLAN-ROOT SINK
@@ -144,7 +144,7 @@ PLAN-ROOT SINK
 |  in pipelines: 09(GETNEXT)
 |
 F04:PLAN FRAGMENT [HASH(CASE valid_tid(4,6,8,10,12) WHEN 4 THEN murmur_hash(i_product_name) WHEN 6 THEN murmur_hash(i_product_name) WHEN 8 THEN murmur_hash(i_product_name) WHEN 10 THEN murmur_hash(i_product_name) WHEN 12 THEN murmur_hash(NULL) END,CASE valid_tid(4,6,8,10,12) WHEN 4 THEN murmur_hash(i_brand) WHEN 6 THEN murmur_hash(i_brand) WHEN 8 THEN murmur_hash(i_brand) WHEN 10 THEN murmur_hash(NULL) WHEN 12 THEN murmur_hash(NULL) END,CASE valid_tid(4,6,8,10,12) WHEN 4 THEN murmur_hash [...]
-Per-Host Resources: mem-estimate=8.33GB mem-reservation=142.69MB thread-reservation=1
+Per-Host Resources: mem-estimate=2.39GB mem-reservation=142.69MB thread-reservation=1
 09:TOP-N [LIMIT=100]
 |  order by: aggif(valid_tid(5,7,9,11,13) IN (5, 7, 9, 11, 13), CASE valid_tid(5,7,9,11,13) WHEN 5 THEN avg(inv_quantity_on_hand) WHEN 7 THEN avg(inv_quantity_on_hand) WHEN 9 THEN avg(inv_quantity_on_hand) WHEN 11 THEN avg(inv_quantity_on_hand) WHEN 13 THEN avg(inv_quantity_on_hand) END) ASC, CASE valid_tid(5,7,9,11,13) WHEN 5 THEN i_product_name WHEN 7 THEN i_product_name WHEN 9 THEN i_product_name WHEN 11 THEN i_product_name WHEN 13 THEN NULL END ASC, CASE valid_tid(5,7,9,11,13) WHEN 5 [...]
 |  mem-estimate=5.47KB mem-reservation=0B thread-reservation=0
@@ -154,7 +154,7 @@ Per-Host Resources: mem-estimate=8.33GB mem-reservation=142.69MB thread-reservat
 08:AGGREGATE [FINALIZE]
 |  output: aggif(valid_tid(5,7,9,11,13) IN (CAST(5 AS INT), CAST(7 AS INT), CAST(9 AS INT), CAST(11 AS INT), CAST(13 AS INT)), CASE valid_tid(5,7,9,11,13) WHEN CAST(5 AS INT) THEN avg(inv_quantity_on_hand) WHEN CAST(7 AS INT) THEN avg(inv_quantity_on_hand) WHEN CAST(9 AS INT) THEN avg(inv_quantity_on_hand) WHEN CAST(11 AS INT) THEN avg(inv_quantity_on_hand) WHEN CAST(13 AS INT) THEN avg(inv_quantity_on_hand) END)
 |  group by: CASE valid_tid(5,7,9,11,13) WHEN CAST(5 AS INT) THEN i_product_name WHEN CAST(7 AS INT) THEN i_product_name WHEN CAST(9 AS INT) THEN i_product_name WHEN CAST(11 AS INT) THEN i_product_name WHEN CAST(13 AS INT) THEN NULL END, CASE valid_tid(5,7,9,11,13) WHEN CAST(5 AS INT) THEN i_brand WHEN CAST(7 AS INT) THEN i_brand WHEN CAST(9 AS INT) THEN i_brand WHEN CAST(11 AS INT) THEN NULL WHEN CAST(13 AS INT) THEN NULL END, CASE valid_tid(5,7,9,11,13) WHEN CAST(5 AS INT) THEN i_class [...]
-|  mem-estimate=1.18GB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=533.82MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=14 row-size=60B cardinality=35.25M
 |  in pipelines: 08(GETNEXT), 14(OPEN)
 |
@@ -174,7 +174,7 @@ Per-Host Resources: mem-estimate=8.33GB mem-reservation=142.69MB thread-reservat
 |  Class 4
 |    output: avg:merge(inv_quantity_on_hand)
 |    group by: NULL, NULL, NULL, NULL
-|  mem-estimate=7.14GB mem-reservation=108.69MB thread-reservation=0
+|  mem-estimate=1.87GB mem-reservation=108.69MB thread-reservation=0
 |  tuple-ids=5N,7N,9N,11N,13N row-size=422B cardinality=35.25M
 |  in pipelines: 14(GETNEXT), 00(OPEN)
 |
@@ -184,7 +184,7 @@ Per-Host Resources: mem-estimate=8.33GB mem-reservation=142.69MB thread-reservat
 |  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
-Per-Host Resources: mem-estimate=5.50GB mem-reservation=140.62MB thread-reservation=2 runtime-filters-memory=3.00MB
+Per-Host Resources: mem-estimate=1.90GB mem-reservation=140.62MB thread-reservation=2 runtime-filters-memory=3.00MB
 07:AGGREGATE [STREAMING]
 |  Class 0
 |    output: avg(CAST(inv_quantity_on_hand AS BIGINT))
@@ -201,7 +201,7 @@ Per-Host Resources: mem-estimate=5.50GB mem-reservation=140.62MB thread-reservat
 |  Class 4
 |    output: avg(CAST(inv_quantity_on_hand AS BIGINT))
 |    group by: NULL, NULL, NULL, NULL
-|  mem-estimate=5.36GB mem-reservation=113.00MB thread-reservation=0
+|  mem-estimate=1.76GB mem-reservation=113.00MB thread-reservation=0
 |  tuple-ids=4N,6N,8N,10N,12N row-size=422B cardinality=35.25M
 |  in pipelines: 00(GETNEXT)
 |
@@ -295,7 +295,7 @@ Per-Host Resources: mem-estimate=5.50GB mem-reservation=140.62MB thread-reservat
    in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=300.45MB Threads=9
-Per-Host Resource Estimates: Memory=13.80GB
+Per-Host Resource Estimates: Memory=4.26GB
 F05:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Instance Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1
 PLAN-ROOT SINK
@@ -310,7 +310,7 @@ PLAN-ROOT SINK
 |  in pipelines: 09(GETNEXT)
 |
 F04:PLAN FRAGMENT [HASH(CASE valid_tid(4,6,8,10,12) WHEN 4 THEN murmur_hash(i_product_name) WHEN 6 THEN murmur_hash(i_product_name) WHEN 8 THEN murmur_hash(i_product_name) WHEN 10 THEN murmur_hash(i_product_name) WHEN 12 THEN murmur_hash(NULL) END,CASE valid_tid(4,6,8,10,12) WHEN 4 THEN murmur_hash(i_brand) WHEN 6 THEN murmur_hash(i_brand) WHEN 8 THEN murmur_hash(i_brand) WHEN 10 THEN murmur_hash(NULL) WHEN 12 THEN murmur_hash(NULL) END,CASE valid_tid(4,6,8,10,12) WHEN 4 THEN murmur_hash [...]
-Per-Instance Resources: mem-estimate=8.33GB mem-reservation=142.69MB thread-reservation=1
+Per-Instance Resources: mem-estimate=2.39GB mem-reservation=142.69MB thread-reservation=1
 09:TOP-N [LIMIT=100]
 |  order by: aggif(valid_tid(5,7,9,11,13) IN (5, 7, 9, 11, 13), CASE valid_tid(5,7,9,11,13) WHEN 5 THEN avg(inv_quantity_on_hand) WHEN 7 THEN avg(inv_quantity_on_hand) WHEN 9 THEN avg(inv_quantity_on_hand) WHEN 11 THEN avg(inv_quantity_on_hand) WHEN 13 THEN avg(inv_quantity_on_hand) END) ASC, CASE valid_tid(5,7,9,11,13) WHEN 5 THEN i_product_name WHEN 7 THEN i_product_name WHEN 9 THEN i_product_name WHEN 11 THEN i_product_name WHEN 13 THEN NULL END ASC, CASE valid_tid(5,7,9,11,13) WHEN 5 [...]
 |  mem-estimate=5.47KB mem-reservation=0B thread-reservation=0
@@ -320,7 +320,7 @@ Per-Instance Resources: mem-estimate=8.33GB mem-reservation=142.69MB thread-rese
 08:AGGREGATE [FINALIZE]
 |  output: aggif(valid_tid(5,7,9,11,13) IN (CAST(5 AS INT), CAST(7 AS INT), CAST(9 AS INT), CAST(11 AS INT), CAST(13 AS INT)), CASE valid_tid(5,7,9,11,13) WHEN CAST(5 AS INT) THEN avg(inv_quantity_on_hand) WHEN CAST(7 AS INT) THEN avg(inv_quantity_on_hand) WHEN CAST(9 AS INT) THEN avg(inv_quantity_on_hand) WHEN CAST(11 AS INT) THEN avg(inv_quantity_on_hand) WHEN CAST(13 AS INT) THEN avg(inv_quantity_on_hand) END)
 |  group by: CASE valid_tid(5,7,9,11,13) WHEN CAST(5 AS INT) THEN i_product_name WHEN CAST(7 AS INT) THEN i_product_name WHEN CAST(9 AS INT) THEN i_product_name WHEN CAST(11 AS INT) THEN i_product_name WHEN CAST(13 AS INT) THEN NULL END, CASE valid_tid(5,7,9,11,13) WHEN CAST(5 AS INT) THEN i_brand WHEN CAST(7 AS INT) THEN i_brand WHEN CAST(9 AS INT) THEN i_brand WHEN CAST(11 AS INT) THEN NULL WHEN CAST(13 AS INT) THEN NULL END, CASE valid_tid(5,7,9,11,13) WHEN CAST(5 AS INT) THEN i_class [...]
-|  mem-estimate=1.18GB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=533.82MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=14 row-size=60B cardinality=35.25M
 |  in pipelines: 08(GETNEXT), 14(OPEN)
 |
@@ -340,7 +340,7 @@ Per-Instance Resources: mem-estimate=8.33GB mem-reservation=142.69MB thread-rese
 |  Class 4
 |    output: avg:merge(inv_quantity_on_hand)
 |    group by: NULL, NULL, NULL, NULL
-|  mem-estimate=7.14GB mem-reservation=108.69MB thread-reservation=0
+|  mem-estimate=1.87GB mem-reservation=108.69MB thread-reservation=0
 |  tuple-ids=5N,7N,9N,11N,13N row-size=422B cardinality=35.25M
 |  in pipelines: 14(GETNEXT), 00(OPEN)
 |
@@ -351,7 +351,7 @@ Per-Instance Resources: mem-estimate=8.33GB mem-reservation=142.69MB thread-rese
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
 Per-Host Shared Resources: mem-estimate=3.00MB mem-reservation=3.00MB thread-reservation=0 runtime-filters-memory=3.00MB
-Per-Instance Resources: mem-estimate=5.40GB mem-reservation=129.00MB thread-reservation=1
+Per-Instance Resources: mem-estimate=1.79GB mem-reservation=129.00MB thread-reservation=1
 07:AGGREGATE [STREAMING]
 |  Class 0
 |    output: avg(CAST(inv_quantity_on_hand AS BIGINT))
@@ -368,7 +368,7 @@ Per-Instance Resources: mem-estimate=5.40GB mem-reservation=129.00MB thread-rese
 |  Class 4
 |    output: avg(CAST(inv_quantity_on_hand AS BIGINT))
 |    group by: NULL, NULL, NULL, NULL
-|  mem-estimate=5.36GB mem-reservation=113.00MB thread-reservation=0
+|  mem-estimate=1.76GB mem-reservation=113.00MB thread-reservation=0
 |  tuple-ids=4N,6N,8N,10N,12N row-size=422B cardinality=35.25M
 |  in pipelines: 00(GETNEXT)
 |
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q67.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q67.test
index 5de0877a8..1c81c16fe 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q67.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q67.test
@@ -42,9 +42,9 @@ order by i_category
 limit 100;
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=251.75MB Threads=5
-Per-Host Resource Estimates: Memory=17.08GB
+Per-Host Resource Estimates: Memory=3.59GB
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=17.08GB mem-reservation=251.75MB thread-reservation=5 runtime-filters-memory=3.00MB
+|  Per-Host Resources: mem-estimate=3.59GB mem-reservation=251.75MB thread-reservation=5 runtime-filters-memory=3.00MB
 PLAN-ROOT SINK
 |  output exprs: i_category, i_class, i_brand, i_product_name, d_year, d_qoy, d_moy, s_store_id, sumsales, rk
 |  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
@@ -81,7 +81,7 @@ PLAN-ROOT SINK
 08:AGGREGATE [FINALIZE]
 |  output: aggif(valid_tid(4,5,6,7,8,9,10,11,12) IN (CAST(4 AS INT), CAST(5 AS INT), CAST(6 AS INT), CAST(7 AS INT), CAST(8 AS INT), CAST(9 AS INT), CAST(10 AS INT), CAST(11 AS INT), CAST(12 AS INT)), CASE valid_tid(4,5,6,7,8,9,10,11,12) WHEN CAST(4 AS INT) THEN sum(coalesce(ss_sales_price * ss_quantity, 0)) WHEN CAST(5 AS INT) THEN sum(coalesce(ss_sales_price * ss_quantity, 0)) WHEN CAST(6 AS INT) THEN sum(coalesce(ss_sales_price * ss_quantity, 0)) WHEN CAST(7 AS INT) THEN sum(coalesce( [...]
 |  group by: CASE valid_tid(4,5,6,7,8,9,10,11,12) WHEN CAST(4 AS INT) THEN i_category WHEN CAST(5 AS INT) THEN i_category WHEN CAST(6 AS INT) THEN i_category WHEN CAST(7 AS INT) THEN i_category WHEN CAST(8 AS INT) THEN i_category WHEN CAST(9 AS INT) THEN i_category WHEN CAST(10 AS INT) THEN i_category WHEN CAST(11 AS INT) THEN i_category WHEN CAST(12 AS INT) THEN NULL END, CASE valid_tid(4,5,6,7,8,9,10,11,12) WHEN CAST(4 AS INT) THEN i_class WHEN CAST(5 AS INT) THEN i_class WHEN CAST(6 A [...]
-|  mem-estimate=1.46GB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=513.92MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=13 row-size=92B cardinality=15.09M
 |  in pipelines: 08(GETNEXT), 07(OPEN)
 |
@@ -113,7 +113,7 @@ PLAN-ROOT SINK
 |  Class 8
 |    output: sum(coalesce(ss_sales_price * CAST(ss_quantity AS DECIMAL(10,0)), CAST(0 AS DECIMAL(18,2))))
 |    group by: NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL
-|  mem-estimate=15.61GB mem-reservation=210.75MB thread-reservation=0
+|  mem-estimate=3.08GB mem-reservation=210.75MB thread-reservation=0
 |  tuple-ids=4N,5N,6N,7N,8N,9N,10N,11N,12N row-size=1.07KB cardinality=15.09M
 |  in pipelines: 07(GETNEXT), 00(OPEN)
 |
@@ -187,7 +187,7 @@ PLAN-ROOT SINK
    in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=477.45MB Threads=11
-Per-Host Resource Estimates: Memory=10.45GB
+Per-Host Resource Estimates: Memory=6.58GB
 F06:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=4.03MB mem-reservation=4.00MB thread-reservation=1
 PLAN-ROOT SINK
@@ -237,7 +237,7 @@ Per-Host Resources: mem-estimate=4.08MB mem-reservation=4.00MB thread-reservatio
 |  in pipelines: 09(GETNEXT)
 |
 F04:PLAN FRAGMENT [HASH(CASE valid_tid(4,5,6,7,8,9,10,11,12) WHEN 4 THEN murmur_hash(i_category) WHEN 5 THEN murmur_hash(i_category) WHEN 6 THEN murmur_hash(i_category) WHEN 7 THEN murmur_hash(i_category) WHEN 8 THEN murmur_hash(i_category) WHEN 9 THEN murmur_hash(i_category) WHEN 10 THEN murmur_hash(i_category) WHEN 11 THEN murmur_hash(i_category) WHEN 12 THEN murmur_hash(NULL) END,CASE valid_tid(4,5,6,7,8,9,10,11,12) WHEN 4 THEN murmur_hash(i_class) WHEN 5 THEN murmur_hash(i_class) WHE [...]
-Per-Host Resources: mem-estimate=5.71GB mem-reservation=243.81MB thread-reservation=1
+Per-Host Resources: mem-estimate=3.26GB mem-reservation=243.81MB thread-reservation=1
 09:TOP-N
 |  order by: CASE valid_tid(4,5,6,7,8,9,10,11,12) WHEN 4 THEN i_category WHEN 5 THEN i_category WHEN 6 THEN i_category WHEN 7 THEN i_category WHEN 8 THEN i_category WHEN 9 THEN i_category WHEN 10 THEN i_category WHEN 11 THEN i_category WHEN 12 THEN NULL END ASC NULLS LAST, aggif(valid_tid(4,5,6,7,8,9,10,11,12) IN (4, 5, 6, 7, 8, 9, 10, 11, 12), CASE valid_tid(4,5,6,7,8,9,10,11,12) WHEN 4 THEN sum(coalesce(ss_sales_price * ss_quantity, 0)) WHEN 5 THEN sum(coalesce(ss_sales_price * ss_quan [...]
 |  limit with ties: 200
@@ -281,7 +281,7 @@ Per-Host Resources: mem-estimate=5.71GB mem-reservation=243.81MB thread-reservat
 |  Class 8
 |    output: sum:merge(coalesce(ss_sales_price * ss_quantity, 0))
 |    group by: NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL
-|  mem-estimate=5.22GB mem-reservation=209.81MB thread-reservation=0
+|  mem-estimate=2.77GB mem-reservation=209.81MB thread-reservation=0
 |  tuple-ids=4N,5N,6N,7N,8N,9N,10N,11N,12N row-size=1.07KB cardinality=15.09M
 |  in pipelines: 17(GETNEXT), 00(OPEN)
 |
@@ -291,7 +291,7 @@ Per-Host Resources: mem-estimate=5.71GB mem-reservation=243.81MB thread-reservat
 |  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
-Per-Host Resources: mem-estimate=4.54GB mem-reservation=223.62MB thread-reservation=2 runtime-filters-memory=3.00MB
+Per-Host Resources: mem-estimate=3.12GB mem-reservation=223.62MB thread-reservation=2 runtime-filters-memory=3.00MB
 07:AGGREGATE [STREAMING]
 |  Class 0
 |    output: sum(coalesce(ss_sales_price * CAST(ss_quantity AS DECIMAL(10,0)), CAST(0 AS DECIMAL(18,2))))
@@ -320,7 +320,7 @@ Per-Host Resources: mem-estimate=4.54GB mem-reservation=223.62MB thread-reservat
 |  Class 8
 |    output: sum(coalesce(ss_sales_price * CAST(ss_quantity AS DECIMAL(10,0)), CAST(0 AS DECIMAL(18,2))))
 |    group by: NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL
-|  mem-estimate=4.45GB mem-reservation=211.00MB thread-reservation=0
+|  mem-estimate=3.03GB mem-reservation=211.00MB thread-reservation=0
 |  tuple-ids=4N,5N,6N,7N,8N,9N,10N,11N,12N row-size=1.07KB cardinality=15.09M
 |  in pipelines: 00(GETNEXT)
 |