You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bi...@apache.org on 2018/10/25 23:18:12 UTC

[4/4] impala git commit: IMPALA-7351: Add estimates to Exchange node

IMPALA-7351: Add estimates to Exchange node

Added rough estimates for exchange node and a justification of the
method in the in-line comments.

Testing:
Updated Planner tests.

Change-Id: I5b577f9511abc48b992e814d50bba4959f23f7fd
Reviewed-on: http://gerrit.cloudera.org:8080/11692
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/8cbec20e
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/8cbec20e
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/8cbec20e

Branch: refs/heads/master
Commit: 8cbec20ea4f1399fce14018d30050d4b7ee501bc
Parents: 1d72c75
Author: Bikramjeet Vig <bi...@cloudera.com>
Authored: Thu Oct 11 12:16:11 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Thu Oct 25 21:17:12 2018 +0000

----------------------------------------------------------------------
 be/src/util/backend-gflag-util.cc               |   3 +
 common/thrift/BackendGflags.thrift              |   2 +
 .../org/apache/impala/planner/ExchangeNode.java |  95 +-
 .../org/apache/impala/planner/PlanNode.java     |   9 +
 .../queries/PlannerTest/kudu-selectivity.test   |  16 +-
 .../queries/PlannerTest/max-row-size.test       | 104 +-
 .../PlannerTest/resource-requirements.test      | 972 +++++++++----------
 .../PlannerTest/spillable-buffer-sizing.test    | 214 ++--
 .../queries/PlannerTest/tpcds-all.test          |  60 +-
 .../queries/PlannerTest/tpch-all.test           |  64 +-
 .../queries/PlannerTest/tpch-nested.test        | 172 ++--
 .../queries/QueryTest/explain-level2.test       |  18 +-
 .../queries/QueryTest/explain-level3.test       |  22 +-
 13 files changed, 920 insertions(+), 831 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/8cbec20e/be/src/util/backend-gflag-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index 3760c84..9f4f2d0 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -67,6 +67,7 @@ DECLARE_int64(kudu_scanner_thread_estimated_bytes_per_column);
 DECLARE_int64(kudu_scanner_thread_max_estimated_bytes);
 DECLARE_int32(catalog_max_parallel_partial_fetch_rpc);
 DECLARE_int64(catalog_partial_fetch_rpc_queue_timeout_s);
+DECLARE_int64(exchg_node_buffer_size_bytes);
 
 namespace impala {
 
@@ -132,6 +133,8 @@ Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* cfg_bytes) {
       FLAGS_catalog_max_parallel_partial_fetch_rpc);
   cfg.__set_catalog_partial_fetch_rpc_queue_timeout_s(
       FLAGS_catalog_partial_fetch_rpc_queue_timeout_s);
+  cfg.__set_exchg_node_buffer_size_bytes(
+      FLAGS_exchg_node_buffer_size_bytes);
   RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &cfg, cfg_bytes));
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/8cbec20e/common/thrift/BackendGflags.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index 531fc2c..dff4b8c 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -107,4 +107,6 @@ struct TBackendGflags {
   40: required i32 catalog_max_parallel_partial_fetch_rpc
 
   41: required i64 catalog_partial_fetch_rpc_queue_timeout_s
+
+  42: required i64 exchg_node_buffer_size_bytes
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/8cbec20e/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
index 05184f2..356ae6b 100644
--- a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
@@ -22,12 +22,14 @@ import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.SortInfo;
 import org.apache.impala.analysis.TupleId;
 import org.apache.impala.common.ImpalaException;
+import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TExchangeNode;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TPlanNode;
 import org.apache.impala.thrift.TPlanNodeType;
 import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TSortInfo;
+
 import com.google.common.base.Preconditions;
 
 /**
@@ -49,6 +51,9 @@ public class ExchangeNode extends PlanNode {
   // update this constant as well.
   private static final double PER_TUPLE_SERIALIZATION_OVERHEAD = 4.0;
 
+  // Empirically derived minimum estimate (in bytes) for the exchange node.
+  private static final int MIN_ESTIMATE_BYTES = 16 * 1024;
+
   // The parameters based on which sorted input streams are merged by this
   // exchange node. Null if this exchange does not merge sorted streams
   private SortInfo mergeInfo_;
@@ -57,6 +62,21 @@ public class ExchangeNode extends PlanNode {
   // only if mergeInfo_ is non-null, i.e. this is a merging exchange node.
   private long offset_;
 
+  private boolean isMergingExchange() {
+    return mergeInfo_ != null;
+  }
+
+  private boolean isBroadcastExchange() {
+    // If the output of the sink is not partitioned but the target fragment is
+    // partitioned, then the data exchange is broadcast.
+    Preconditions.checkState(!children_.isEmpty());
+    DataSink sink = getChild(0).getFragment().getSink();
+    if (sink == null) return false;
+    Preconditions.checkState(sink instanceof DataStreamSink);
+    DataStreamSink streamSink = (DataStreamSink) sink;
+    return !streamSink.getOutputPartition().isPartitioned() && fragment_.isPartitioned();
+  }
+
   public ExchangeNode(PlanNodeId id, PlanNode input) {
     super(id, "EXCHANGE");
     offset_ = 0;
@@ -136,7 +156,7 @@ public class ExchangeNode extends PlanNode {
       output.append(detailPrefix + "offset: ").append(offset_).append("\n");
     }
 
-    if (mergeInfo_ != null && detailLevel.ordinal() > TExplainLevel.MINIMAL.ordinal()) {
+    if (isMergingExchange() && detailLevel.ordinal() > TExplainLevel.MINIMAL.ordinal()) {
       output.append(detailPrefix + "order by: ");
       for (int i = 0; i < mergeInfo_.getSortExprs().size(); ++i) {
         if (i > 0) output.append(", ");
@@ -160,14 +180,11 @@ public class ExchangeNode extends PlanNode {
     Preconditions.checkState(!children_.isEmpty());
     DataSink sink = getChild(0).getFragment().getSink();
     if (sink == null) return "";
-    Preconditions.checkState(sink instanceof DataStreamSink);
-    DataStreamSink streamSink = (DataStreamSink) sink;
-    if (!streamSink.getOutputPartition().isPartitioned() &&
-        fragment_.isPartitioned()) {
-      // If the output of the sink is not partitioned but the target fragment is
-      // partitioned, then the data exchange is broadcast.
+    if (isBroadcastExchange()) {
       return "BROADCAST";
     } else {
+      Preconditions.checkState(sink instanceof DataStreamSink);
+      DataStreamSink streamSink = (DataStreamSink) sink;
       return streamSink.getOutputPartition().getExplainString();
     }
   }
@@ -183,8 +200,66 @@ public class ExchangeNode extends PlanNode {
 
   @Override
   public void computeNodeResourceProfile(TQueryOptions queryOptions) {
-    // TODO: add an estimate
-    nodeResourceProfile_ =  ResourceProfile.noReservation(0);
+    // For non-merging exchanges, one row batch queue is maintained for row
+    // batches from all sender fragment instances. For merging exchange, one
+    // queue is created for the batches from each distinct sender. There is a
+    // soft limit on every row batch queue of FLAGS_exchg_node_buffer_size_bytes
+    // (default 10MB). There is also a deferred rpc queue which queues at max
+    // one rpc payload (containing the rowbatch) per sender in-case the row
+    // batch queue hits the previously mentioned soft limit. Actual memory used
+    // depends on the row size (that can vary a lot due to presence of var len
+    // strings) and on the rate at which rows are received and consumed from the
+    // exchange node which in turn depends on the complexity of the query and
+    // the system load. This makes it difficult to accurately estimate the
+    // memory usage at runtime. The following estimates assume that memory usage will
+    // lean towards the soft limits.
+    Preconditions.checkState(!children_.isEmpty());
+    Preconditions.checkNotNull(children_.get(0).getFragment());
+    int numSenders = children_.get(0).getFragment().getNumInstances(queryOptions.mt_dop);
+    long estimatedTotalQueueByteSize = estimateTotalQueueByteSize(numSenders);
+    long estimatedDeferredRPCQueueSize = estimateDeferredRPCQueueSize(queryOptions,
+        numSenders);
+    long estimatedMem = Math.max(
+        checkedAdd(estimatedTotalQueueByteSize, estimatedDeferredRPCQueueSize),
+        MIN_ESTIMATE_BYTES);
+    nodeResourceProfile_ = ResourceProfile.noReservation(estimatedMem);
+  }
+
+  // Returns the estimated size of the deferred batch queue (in bytes) by
+  // assuming that at least one row batch rpc payload per sender is queued.
+  private long estimateDeferredRPCQueueSize(TQueryOptions queryOptions,
+      int numSenders) {
+    long rowBatchSize = queryOptions.isSetBatch_size() && queryOptions.batch_size > 0
+        ? queryOptions.batch_size : DEFAULT_ROWBATCH_SIZE;
+    // Set an upper limit based on estimated cardinality.
+    if (getCardinality() > 0) rowBatchSize = Math.min(rowBatchSize, getCardinality());
+    long avgRowBatchByteSize = Math.min(
+        (long) Math.ceil(rowBatchSize * getAvgSerializedRowSize(this)),
+        ROWBATCH_MAX_MEM_USAGE);
+    long deferredBatchQueueSize = avgRowBatchByteSize * numSenders;
+    return deferredBatchQueueSize;
+  }
+
+  // Returns the total estimated size (in bytes) of the row batch queues by
+  // assuming enough batches can be queued such that it hits the row batch
+  // queue's soft mem limit.
+  private long estimateTotalQueueByteSize(int numSenders) {
+    int numQueues = isMergingExchange() ? numSenders : 1;
+    long maxQueueByteSize = BackendConfig.INSTANCE.getBackendCfg().
+        exchg_node_buffer_size_bytes;
+    // TODO: Should we set a better default size here? This might be alot for
+    // queries without stats.
+    long estimatedTotalQueueByteSize = numQueues * maxQueueByteSize;
+    // Set an upper limit based on estimated cardinality.
+    if (hasValidStats()) {
+      long totalBytesToReceive = (long) Math.ceil(getAvgRowSize() * getCardinality());
+      // Assuming no skew in distribution during data shuffling.
+      long bytesToReceivePerExchNode = isBroadcastExchange() ? totalBytesToReceive
+          : totalBytesToReceive / getNumNodes();
+      estimatedTotalQueueByteSize = Math.min(bytesToReceivePerExchNode,
+          estimatedTotalQueueByteSize);
+    }
+    return estimatedTotalQueueByteSize;
   }
 
   @Override
@@ -202,7 +277,7 @@ public class ExchangeNode extends PlanNode {
       msg.exchange_node.addToInput_row_tuples(tid.asInt());
     }
 
-    if (mergeInfo_ != null) {
+    if (isMergingExchange()) {
       TSortInfo sortInfo = new TSortInfo(
           Expr.treesToThrift(mergeInfo_.getSortExprs()), mergeInfo_.getIsAscOrder(),
           mergeInfo_.getNullsFirst());

http://git-wip-us.apache.org/repos/asf/impala/blob/8cbec20e/fe/src/main/java/org/apache/impala/planner/PlanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
index 927cc18..744aa09 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
@@ -68,6 +68,15 @@ import com.google.common.math.LongMath;
 abstract public class PlanNode extends TreeNode<PlanNode> {
   private final static Logger LOG = LoggerFactory.getLogger(PlanNode.class);
 
+  // The default row batch size used if the BATCH_SIZE query option is not set
+  // or is less than 1. Must be in sync with QueryState::DEFAULT_BATCH_SIZE.
+  protected final static int DEFAULT_ROWBATCH_SIZE = 1024;
+
+  // Max memory that a row batch can accumulate before it is considered at capacity.
+  // This is a soft capacity: row batches may exceed the capacity, preferably only by a
+  // row's worth of data. Must be in sync with RowBatch::AT_CAPACITY_MEM_USAGE.
+  protected final static int ROWBATCH_MAX_MEM_USAGE = 8 * 1024 * 1024;
+
   // String used for this node in getExplainString().
   protected String displayName_;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/8cbec20e/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test
index 8a59968..6ae50f3 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test
@@ -12,12 +12,12 @@ Per-Host Resources: mem-estimate=1.88MB mem-reservation=0B thread-reservation=2
      in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
 F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=1
+Per-Host Resources: mem-estimate=16.00KB mem-reservation=0B thread-reservation=1
   PLAN-ROOT SINK
   |  mem-estimate=0B mem-reservation=0B thread-reservation=0
   |
   01:EXCHANGE [UNPARTITIONED]
-     mem-estimate=0B mem-reservation=0B thread-reservation=0
+     mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
      tuple-ids=0 row-size=124B cardinality=1
      in pipelines: 00(GETNEXT)
 
@@ -47,12 +47,12 @@ Per-Host Resources: mem-estimate=3.75MB mem-reservation=0B thread-reservation=2
      in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
 F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=1
+Per-Host Resources: mem-estimate=16.00KB mem-reservation=0B thread-reservation=1
   PLAN-ROOT SINK
   |  mem-estimate=0B mem-reservation=0B thread-reservation=0
   |
   01:EXCHANGE [UNPARTITIONED]
-     mem-estimate=0B mem-reservation=0B thread-reservation=0
+     mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
      tuple-ids=0 row-size=124B cardinality=1
      in pipelines: 00(GETNEXT)
 
@@ -81,12 +81,12 @@ Per-Host Resources: mem-estimate=3.75MB mem-reservation=0B thread-reservation=2
      in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
 F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=1
+Per-Host Resources: mem-estimate=517.89KB mem-reservation=0B thread-reservation=1
   PLAN-ROOT SINK
   |  mem-estimate=0B mem-reservation=0B thread-reservation=0
   |
   01:EXCHANGE [UNPARTITIONED]
-     mem-estimate=0B mem-reservation=0B thread-reservation=0
+     mem-estimate=517.89KB mem-reservation=0B thread-reservation=0
      tuple-ids=0 row-size=124B cardinality=3317
      in pipelines: 00(GETNEXT)
 
@@ -114,12 +114,12 @@ Per-Host Resources: mem-estimate=3.75MB mem-reservation=0B thread-reservation=2
      in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
 F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=1
+Per-Host Resources: mem-estimate=16.00KB mem-reservation=0B thread-reservation=1
   PLAN-ROOT SINK
   |  mem-estimate=0B mem-reservation=0B thread-reservation=0
   |
   01:EXCHANGE [UNPARTITIONED]
-     mem-estimate=0B mem-reservation=0B thread-reservation=0
+     mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
      tuple-ids=0 row-size=124B cardinality=3
      in pipelines: 00(GETNEXT)
 

http://git-wip-us.apache.org/repos/asf/impala/blob/8cbec20e/testdata/workloads/functional-planner/queries/PlannerTest/max-row-size.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/max-row-size.test b/testdata/workloads/functional-planner/queries/PlannerTest/max-row-size.test
index f82f1be..e1d8119 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/max-row-size.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/max-row-size.test
@@ -5,20 +5,20 @@ from tpch_parquet.customer
     inner join tpch_parquet.nation on c_nationkey = n_nationkey
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=33.97MB Threads=5
-Per-Host Resource Estimates: Memory=58MB
+Per-Host Resource Estimates: Memory=68MB
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=1
+|  Per-Host Resources: mem-estimate=10.35MB mem-reservation=0B thread-reservation=1
 PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 04:EXCHANGE [UNPARTITIONED]
-|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|  mem-estimate=10.35MB mem-reservation=0B thread-reservation=0
 |  tuple-ids=0,1 row-size=355B cardinality=150000
 |  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
-Per-Host Resources: mem-estimate=41.94MB mem-reservation=33.94MB thread-reservation=2 runtime-filters-memory=1.00MB
+Per-Host Resources: mem-estimate=41.95MB mem-reservation=33.94MB thread-reservation=2 runtime-filters-memory=1.00MB
 02:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: c_nationkey = n_nationkey
 |  fk/pk conjuncts: c_nationkey = n_nationkey
@@ -28,7 +28,7 @@ Per-Host Resources: mem-estimate=41.94MB mem-reservation=33.94MB thread-reservat
 |  in pipelines: 00(GETNEXT), 01(OPEN)
 |
 |--03:EXCHANGE [BROADCAST]
-|  |  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|  |  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
 |  |  tuple-ids=1 row-size=117B cardinality=25
 |  |  in pipelines: 01(GETNEXT)
 |  |
@@ -62,20 +62,20 @@ from tpch_parquet.lineitem
     left join tpch_parquet.orders on l_orderkey = o_orderkey
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=110.00MB Threads=5
-Per-Host Resource Estimates: Memory=420MB
+Per-Host Resource Estimates: Memory=442MB
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=1
+|  Per-Host Resources: mem-estimate=11.35MB mem-reservation=0B thread-reservation=1
 PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 04:EXCHANGE [UNPARTITIONED]
-|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|  mem-estimate=11.35MB mem-reservation=0B thread-reservation=0
 |  tuple-ids=0,1N row-size=454B cardinality=6001215
 |  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
-Per-Host Resources: mem-estimate=380.41MB mem-reservation=86.00MB thread-reservation=2
+Per-Host Resources: mem-estimate=390.80MB mem-reservation=86.00MB thread-reservation=2
 02:HASH JOIN [LEFT OUTER JOIN, BROADCAST]
 |  hash predicates: l_orderkey = o_orderkey
 |  fk/pk conjuncts: l_orderkey = o_orderkey
@@ -84,7 +84,7 @@ Per-Host Resources: mem-estimate=380.41MB mem-reservation=86.00MB thread-reserva
 |  in pipelines: 00(GETNEXT), 01(OPEN)
 |
 |--03:EXCHANGE [BROADCAST]
-|  |  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|  |  mem-estimate=10.38MB mem-reservation=0B thread-reservation=0
 |  |  tuple-ids=1 row-size=191B cardinality=1500000
 |  |  in pipelines: 01(GETNEXT)
 |  |
@@ -101,11 +101,11 @@ Per-Host Resources: mem-estimate=380.41MB mem-reservation=86.00MB thread-reserva
 |     in pipelines: 01(GETNEXT)
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
-   partitions=1/1 files=3 size=193.71MB
+   partitions=1/1 files=3 size=193.72MB
    stored statistics:
-     table: rows=6001215 size=193.71MB
+     table: rows=6001215 size=193.72MB
      columns: all
-   extrapolated-rows=disabled max-scan-range-rows=2141802
+   extrapolated-rows=disabled max-scan-range-rows=2141530
    mem-estimate=80.00MB mem-reservation=40.00MB thread-reservation=1
    tuple-ids=0 row-size=263B cardinality=6001215
    in pipelines: 00(GETNEXT)
@@ -116,20 +116,20 @@ select * from tpch_parquet.lineitem
 where l_orderkey not in (select o_orderkey from tpch_parquet.orders)
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=78.00MB Threads=5
-Per-Host Resource Estimates: Memory=154MB
+Per-Host Resource Estimates: Memory=175MB
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=1
+|  Per-Host Resources: mem-estimate=10.78MB mem-reservation=0B thread-reservation=1
 PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 04:EXCHANGE [UNPARTITIONED]
-|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|  mem-estimate=10.78MB mem-reservation=0B thread-reservation=0
 |  tuple-ids=0 row-size=263B cardinality=6001215
 |  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
-Per-Host Resources: mem-estimate=114.00MB mem-reservation=74.00MB thread-reservation=2
+Per-Host Resources: mem-estimate=124.02MB mem-reservation=74.00MB thread-reservation=2
 02:HASH JOIN [NULL AWARE LEFT ANTI JOIN, BROADCAST]
 |  hash predicates: l_orderkey = o_orderkey
 |  mem-estimate=34.00MB mem-reservation=34.00MB spill-buffer=1.00MB thread-reservation=0
@@ -137,7 +137,7 @@ Per-Host Resources: mem-estimate=114.00MB mem-reservation=74.00MB thread-reserva
 |  in pipelines: 00(GETNEXT), 01(OPEN)
 |
 |--03:EXCHANGE [BROADCAST]
-|  |  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|  |  mem-estimate=10.02MB mem-reservation=0B thread-reservation=0
 |  |  tuple-ids=1 row-size=8B cardinality=1500000
 |  |  in pipelines: 01(GETNEXT)
 |  |
@@ -154,11 +154,11 @@ Per-Host Resources: mem-estimate=114.00MB mem-reservation=74.00MB thread-reserva
 |     in pipelines: 01(GETNEXT)
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
-   partitions=1/1 files=3 size=193.71MB
+   partitions=1/1 files=3 size=193.72MB
    stored statistics:
-     table: rows=6001215 size=193.71MB
+     table: rows=6001215 size=193.72MB
      columns: all
-   extrapolated-rows=disabled max-scan-range-rows=2141802
+   extrapolated-rows=disabled max-scan-range-rows=2141530
    mem-estimate=80.00MB mem-reservation=40.00MB thread-reservation=1
    tuple-ids=0 row-size=263B cardinality=6001215
    in pipelines: 00(GETNEXT)
@@ -172,20 +172,20 @@ group by 1, 2
 having count(*) = 1
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=125.00MB Threads=7
-Per-Host Resource Estimates: Memory=253MB
+Per-Host Resource Estimates: Memory=293MB
 
 F04:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=1
+|  Per-Host Resources: mem-estimate=10.11MB mem-reservation=0B thread-reservation=1
 PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 08:EXCHANGE [UNPARTITIONED]
-|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|  mem-estimate=10.11MB mem-reservation=0B thread-reservation=0
 |  tuple-ids=2 row-size=33B cardinality=4690314
 |  in pipelines: 07(GETNEXT)
 |
 F03:PLAN FRAGMENT [HASH(l_orderkey,o_orderstatus)] hosts=3 instances=3
-Per-Host Resources: mem-estimate=46.00MB mem-reservation=46.00MB thread-reservation=1
+Per-Host Resources: mem-estimate=56.11MB mem-reservation=46.00MB thread-reservation=1
 07:AGGREGATE [FINALIZE]
 |  output: count:merge(*)
 |  group by: l_orderkey, o_orderstatus
@@ -195,12 +195,12 @@ Per-Host Resources: mem-estimate=46.00MB mem-reservation=46.00MB thread-reservat
 |  in pipelines: 07(GETNEXT), 00(OPEN)
 |
 06:EXCHANGE [HASH(l_orderkey,o_orderstatus)]
-|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|  mem-estimate=10.11MB mem-reservation=0B thread-reservation=0
 |  tuple-ids=2 row-size=33B cardinality=4690314
 |  in pipelines: 00(GETNEXT)
 |
 F02:PLAN FRAGMENT [HASH(l_orderkey)] hosts=3 instances=3
-Per-Host Resources: mem-estimate=86.12MB mem-reservation=66.00MB thread-reservation=1 runtime-filters-memory=1.00MB
+Per-Host Resources: mem-estimate=106.22MB mem-reservation=66.00MB thread-reservation=1 runtime-filters-memory=1.00MB
 03:AGGREGATE [STREAMING]
 |  output: count(*)
 |  group by: l_orderkey, o_orderstatus
@@ -217,7 +217,7 @@ Per-Host Resources: mem-estimate=86.12MB mem-reservation=66.00MB thread-reservat
 |  in pipelines: 00(GETNEXT), 01(OPEN)
 |
 |--05:EXCHANGE [HASH(o_orderkey)]
-|  |  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|  |  mem-estimate=10.06MB mem-reservation=0B thread-reservation=0
 |  |  tuple-ids=1 row-size=25B cardinality=1500000
 |  |  in pipelines: 01(GETNEXT)
 |  |
@@ -234,19 +234,19 @@ Per-Host Resources: mem-estimate=86.12MB mem-reservation=66.00MB thread-reservat
 |     in pipelines: 01(GETNEXT)
 |
 04:EXCHANGE [HASH(l_orderkey)]
-|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|  mem-estimate=10.04MB mem-reservation=0B thread-reservation=0
 |  tuple-ids=0 row-size=8B cardinality=6001215
 |  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
 Per-Host Resources: mem-estimate=81.00MB mem-reservation=5.00MB thread-reservation=2 runtime-filters-memory=1.00MB
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
-   partitions=1/1 files=3 size=193.71MB
+   partitions=1/1 files=3 size=193.72MB
    runtime filters: RF000[bloom] -> l_orderkey
    stored statistics:
-     table: rows=6001215 size=193.71MB
+     table: rows=6001215 size=193.72MB
      columns: all
-   extrapolated-rows=disabled max-scan-range-rows=2141802
+   extrapolated-rows=disabled max-scan-range-rows=2141530
    mem-estimate=80.00MB mem-reservation=4.00MB thread-reservation=1
    tuple-ids=0 row-size=8B cardinality=6001215
    in pipelines: 00(GETNEXT)
@@ -257,20 +257,20 @@ select distinct *
 from tpch_parquet.lineitem
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=120.00MB Threads=4
-Per-Host Resource Estimates: Memory=3.31GB
+Per-Host Resource Estimates: Memory=3.33GB
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=1
+|  Per-Host Resources: mem-estimate=10.78MB mem-reservation=0B thread-reservation=1
 PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 04:EXCHANGE [UNPARTITIONED]
-|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|  mem-estimate=10.78MB mem-reservation=0B thread-reservation=0
 |  tuple-ids=1 row-size=263B cardinality=6001215
 |  in pipelines: 03(GETNEXT)
 |
 F01:PLAN FRAGMENT [HASH(tpch_parquet.lineitem.l_orderkey,tpch_parquet.lineitem.l_partkey,tpch_parquet.lineitem.l_suppkey,tpch_parquet.lineitem.l_linenumber,tpch_parquet.lineitem.l_quantity,tpch_parquet.lineitem.l_extendedprice,tpch_parquet.lineitem.l_discount,tpch_parquet.lineitem.l_tax,tpch_parquet.lineitem.l_returnflag,tpch_parquet.lineitem.l_linestatus,tpch_parquet.lineitem.l_shipdate,tpch_parquet.lineitem.l_commitdate,tpch_parquet.lineitem.l_receiptdate,tpch_parquet.lineitem.l_shipinstruct,tpch_parquet.lineitem.l_shipmode,tpch_parquet.lineitem.l_comment)] hosts=3 instances=3
-Per-Host Resources: mem-estimate=1.62GB mem-reservation=46.00MB thread-reservation=1
+Per-Host Resources: mem-estimate=1.63GB mem-reservation=46.00MB thread-reservation=1
 03:AGGREGATE [FINALIZE]
 |  group by: tpch_parquet.lineitem.l_orderkey, tpch_parquet.lineitem.l_partkey, tpch_parquet.lineitem.l_suppkey, tpch_parquet.lineitem.l_linenumber, tpch_parquet.lineitem.l_quantity, tpch_parquet.lineitem.l_extendedprice, tpch_parquet.lineitem.l_discount, tpch_parquet.lineitem.l_tax, tpch_parquet.lineitem.l_returnflag, tpch_parquet.lineitem.l_linestatus, tpch_parquet.lineitem.l_shipdate, tpch_parquet.lineitem.l_commitdate, tpch_parquet.lineitem.l_receiptdate, tpch_parquet.lineitem.l_shipinstruct, tpch_parquet.lineitem.l_shipmode, tpch_parquet.lineitem.l_comment
 |  mem-estimate=1.62GB mem-reservation=46.00MB spill-buffer=2.00MB thread-reservation=0
@@ -278,7 +278,7 @@ Per-Host Resources: mem-estimate=1.62GB mem-reservation=46.00MB thread-reservati
 |  in pipelines: 03(GETNEXT), 00(OPEN)
 |
 02:EXCHANGE [HASH(tpch_parquet.lineitem.l_orderkey,tpch_parquet.lineitem.l_partkey,tpch_parquet.lineitem.l_suppkey,tpch_parquet.lineitem.l_linenumber,tpch_parquet.lineitem.l_quantity,tpch_parquet.lineitem.l_extendedprice,tpch_parquet.lineitem.l_discount,tpch_parquet.lineitem.l_tax,tpch_parquet.lineitem.l_returnflag,tpch_parquet.lineitem.l_linestatus,tpch_parquet.lineitem.l_shipdate,tpch_parquet.lineitem.l_commitdate,tpch_parquet.lineitem.l_receiptdate,tpch_parquet.lineitem.l_shipinstruct,tpch_parquet.lineitem.l_shipmode,tpch_parquet.lineitem.l_comment)]
-|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|  mem-estimate=10.78MB mem-reservation=0B thread-reservation=0
 |  tuple-ids=1 row-size=263B cardinality=6001215
 |  in pipelines: 00(GETNEXT)
 |
@@ -291,11 +291,11 @@ Per-Host Resources: mem-estimate=1.69GB mem-reservation=74.00MB thread-reservati
 |  in pipelines: 00(GETNEXT)
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
-   partitions=1/1 files=3 size=193.71MB
+   partitions=1/1 files=3 size=193.72MB
    stored statistics:
-     table: rows=6001215 size=193.71MB
+     table: rows=6001215 size=193.72MB
      columns: all
-   extrapolated-rows=disabled max-scan-range-rows=2141802
+   extrapolated-rows=disabled max-scan-range-rows=2141530
    mem-estimate=80.00MB mem-reservation=40.00MB thread-reservation=1
    tuple-ids=0 row-size=263B cardinality=6001215
    in pipelines: 00(GETNEXT)
@@ -307,20 +307,20 @@ from tpch_parquet.lineitem
 group by 1, 2
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=98.00MB Threads=4
-Per-Host Resource Estimates: Memory=483MB
+Per-Host Resource Estimates: Memory=503MB
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=1
+|  Per-Host Resources: mem-estimate=10.11MB mem-reservation=0B thread-reservation=1
 PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 04:EXCHANGE [UNPARTITIONED]
-|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|  mem-estimate=10.11MB mem-reservation=0B thread-reservation=0
 |  tuple-ids=1 row-size=32B cardinality=6001215
 |  in pipelines: 03(GETNEXT)
 |
 F01:PLAN FRAGMENT [HASH(l_orderkey,l_partkey)] hosts=3 instances=3
-Per-Host Resources: mem-estimate=201.46MB mem-reservation=48.00MB thread-reservation=1
+Per-Host Resources: mem-estimate=211.56MB mem-reservation=48.00MB thread-reservation=1
 03:AGGREGATE [FINALIZE]
 |  output: group_concat:merge(l_linestatus, ',')
 |  group by: l_orderkey, l_partkey
@@ -329,7 +329,7 @@ Per-Host Resources: mem-estimate=201.46MB mem-reservation=48.00MB thread-reserva
 |  in pipelines: 03(GETNEXT), 00(OPEN)
 |
 02:EXCHANGE [HASH(l_orderkey,l_partkey)]
-|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|  mem-estimate=10.11MB mem-reservation=0B thread-reservation=0
 |  tuple-ids=1 row-size=32B cardinality=6001215
 |  in pipelines: 00(GETNEXT)
 |
@@ -343,11 +343,11 @@ Per-Host Resources: mem-estimate=281.46MB mem-reservation=50.00MB thread-reserva
 |  in pipelines: 00(GETNEXT)
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
-   partitions=1/1 files=3 size=193.71MB
+   partitions=1/1 files=3 size=193.72MB
    stored statistics:
-     table: rows=6001215 size=193.71MB
+     table: rows=6001215 size=193.72MB
      columns: all
-   extrapolated-rows=disabled max-scan-range-rows=2141802
+   extrapolated-rows=disabled max-scan-range-rows=2141530
    mem-estimate=80.00MB mem-reservation=16.00MB thread-reservation=1
    tuple-ids=0 row-size=33B cardinality=6001215
    in pipelines: 00(GETNEXT)
@@ -362,17 +362,17 @@ Per-Host Resource Estimates: Memory=56MB
 Codegen disabled by planner
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=1
+|  Per-Host Resources: mem-estimate=56.26KB mem-reservation=0B thread-reservation=1
 PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 04:EXCHANGE [UNPARTITIONED]
-|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|  mem-estimate=56.26KB mem-reservation=0B thread-reservation=0
 |  tuple-ids=3,2 row-size=6B cardinality=7300
 |  in pipelines: 01(GETNEXT)
 |
 F01:PLAN FRAGMENT [HASH(int_col)] hosts=3 instances=3
-Per-Host Resources: mem-estimate=40.00MB mem-reservation=40.00MB thread-reservation=1
+Per-Host Resources: mem-estimate=40.04MB mem-reservation=40.00MB thread-reservation=1
 02:ANALYTIC
 |  functions: max(tinyint_col)
 |  partition by: int_col
@@ -387,7 +387,7 @@ Per-Host Resources: mem-estimate=40.00MB mem-reservation=40.00MB thread-reservat
 |  in pipelines: 01(GETNEXT), 00(OPEN)
 |
 03:EXCHANGE [HASH(int_col)]
-|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|  mem-estimate=38.88KB mem-reservation=0B thread-reservation=0
 |  tuple-ids=0 row-size=5B cardinality=7300
 |  in pipelines: 00(GETNEXT)
 |