You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bi...@apache.org on 2018/10/25 23:18:12 UTC
[4/4] impala git commit: IMPALA-7351: Add estimates to Exchange node
IMPALA-7351: Add estimates to Exchange node
Added rough estimates for exchange node and a justification of the
method in the in-line comments.
Testing:
Updated Planner tests.
Change-Id: I5b577f9511abc48b992e814d50bba4959f23f7fd
Reviewed-on: http://gerrit.cloudera.org:8080/11692
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/8cbec20e
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/8cbec20e
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/8cbec20e
Branch: refs/heads/master
Commit: 8cbec20ea4f1399fce14018d30050d4b7ee501bc
Parents: 1d72c75
Author: Bikramjeet Vig <bi...@cloudera.com>
Authored: Thu Oct 11 12:16:11 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Thu Oct 25 21:17:12 2018 +0000
----------------------------------------------------------------------
be/src/util/backend-gflag-util.cc | 3 +
common/thrift/BackendGflags.thrift | 2 +
.../org/apache/impala/planner/ExchangeNode.java | 95 +-
.../org/apache/impala/planner/PlanNode.java | 9 +
.../queries/PlannerTest/kudu-selectivity.test | 16 +-
.../queries/PlannerTest/max-row-size.test | 104 +-
.../PlannerTest/resource-requirements.test | 972 +++++++++----------
.../PlannerTest/spillable-buffer-sizing.test | 214 ++--
.../queries/PlannerTest/tpcds-all.test | 60 +-
.../queries/PlannerTest/tpch-all.test | 64 +-
.../queries/PlannerTest/tpch-nested.test | 172 ++--
.../queries/QueryTest/explain-level2.test | 18 +-
.../queries/QueryTest/explain-level3.test | 22 +-
13 files changed, 920 insertions(+), 831 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/8cbec20e/be/src/util/backend-gflag-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index 3760c84..9f4f2d0 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -67,6 +67,7 @@ DECLARE_int64(kudu_scanner_thread_estimated_bytes_per_column);
DECLARE_int64(kudu_scanner_thread_max_estimated_bytes);
DECLARE_int32(catalog_max_parallel_partial_fetch_rpc);
DECLARE_int64(catalog_partial_fetch_rpc_queue_timeout_s);
+DECLARE_int64(exchg_node_buffer_size_bytes);
namespace impala {
@@ -132,6 +133,8 @@ Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* cfg_bytes) {
FLAGS_catalog_max_parallel_partial_fetch_rpc);
cfg.__set_catalog_partial_fetch_rpc_queue_timeout_s(
FLAGS_catalog_partial_fetch_rpc_queue_timeout_s);
+ cfg.__set_exchg_node_buffer_size_bytes(
+ FLAGS_exchg_node_buffer_size_bytes);
RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &cfg, cfg_bytes));
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/impala/blob/8cbec20e/common/thrift/BackendGflags.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index 531fc2c..dff4b8c 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -107,4 +107,6 @@ struct TBackendGflags {
40: required i32 catalog_max_parallel_partial_fetch_rpc
41: required i64 catalog_partial_fetch_rpc_queue_timeout_s
+
+ 42: required i64 exchg_node_buffer_size_bytes
}
http://git-wip-us.apache.org/repos/asf/impala/blob/8cbec20e/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
index 05184f2..356ae6b 100644
--- a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
@@ -22,12 +22,14 @@ import org.apache.impala.analysis.Expr;
import org.apache.impala.analysis.SortInfo;
import org.apache.impala.analysis.TupleId;
import org.apache.impala.common.ImpalaException;
+import org.apache.impala.service.BackendConfig;
import org.apache.impala.thrift.TExchangeNode;
import org.apache.impala.thrift.TExplainLevel;
import org.apache.impala.thrift.TPlanNode;
import org.apache.impala.thrift.TPlanNodeType;
import org.apache.impala.thrift.TQueryOptions;
import org.apache.impala.thrift.TSortInfo;
+
import com.google.common.base.Preconditions;
/**
@@ -49,6 +51,9 @@ public class ExchangeNode extends PlanNode {
// update this constant as well.
private static final double PER_TUPLE_SERIALIZATION_OVERHEAD = 4.0;
+ // Empirically derived minimum estimate (in bytes) for the exchange node.
+ private static final int MIN_ESTIMATE_BYTES = 16 * 1024;
+
// The parameters based on which sorted input streams are merged by this
// exchange node. Null if this exchange does not merge sorted streams
private SortInfo mergeInfo_;
@@ -57,6 +62,21 @@ public class ExchangeNode extends PlanNode {
// only if mergeInfo_ is non-null, i.e. this is a merging exchange node.
private long offset_;
+ private boolean isMergingExchange() {
+ return mergeInfo_ != null;
+ }
+
+ private boolean isBroadcastExchange() {
+ // If the output of the sink is not partitioned but the target fragment is
+ // partitioned, then the data exchange is broadcast.
+ Preconditions.checkState(!children_.isEmpty());
+ DataSink sink = getChild(0).getFragment().getSink();
+ if (sink == null) return false;
+ Preconditions.checkState(sink instanceof DataStreamSink);
+ DataStreamSink streamSink = (DataStreamSink) sink;
+ return !streamSink.getOutputPartition().isPartitioned() && fragment_.isPartitioned();
+ }
+
public ExchangeNode(PlanNodeId id, PlanNode input) {
super(id, "EXCHANGE");
offset_ = 0;
@@ -136,7 +156,7 @@ public class ExchangeNode extends PlanNode {
output.append(detailPrefix + "offset: ").append(offset_).append("\n");
}
- if (mergeInfo_ != null && detailLevel.ordinal() > TExplainLevel.MINIMAL.ordinal()) {
+ if (isMergingExchange() && detailLevel.ordinal() > TExplainLevel.MINIMAL.ordinal()) {
output.append(detailPrefix + "order by: ");
for (int i = 0; i < mergeInfo_.getSortExprs().size(); ++i) {
if (i > 0) output.append(", ");
@@ -160,14 +180,11 @@ public class ExchangeNode extends PlanNode {
Preconditions.checkState(!children_.isEmpty());
DataSink sink = getChild(0).getFragment().getSink();
if (sink == null) return "";
- Preconditions.checkState(sink instanceof DataStreamSink);
- DataStreamSink streamSink = (DataStreamSink) sink;
- if (!streamSink.getOutputPartition().isPartitioned() &&
- fragment_.isPartitioned()) {
- // If the output of the sink is not partitioned but the target fragment is
- // partitioned, then the data exchange is broadcast.
+ if (isBroadcastExchange()) {
return "BROADCAST";
} else {
+ Preconditions.checkState(sink instanceof DataStreamSink);
+ DataStreamSink streamSink = (DataStreamSink) sink;
return streamSink.getOutputPartition().getExplainString();
}
}
@@ -183,8 +200,66 @@ public class ExchangeNode extends PlanNode {
@Override
public void computeNodeResourceProfile(TQueryOptions queryOptions) {
- // TODO: add an estimate
- nodeResourceProfile_ = ResourceProfile.noReservation(0);
+ // For non-merging exchanges, one row batch queue is maintained for row
+ // batches from all sender fragment instances. For merging exchange, one
+ // queue is created for the batches from each distinct sender. There is a
+ // soft limit on every row batch queue of FLAGS_exchg_node_buffer_size_bytes
+ // (default 10MB). There is also a deferred rpc queue which queues at max
+ // one rpc payload (containing the rowbatch) per sender in-case the row
+ // batch queue hits the previously mentioned soft limit. Actual memory used
+ // depends on the row size (that can vary a lot due to presence of var len
+ // strings) and on the rate at which rows are received and consumed from the
+ // exchange node which in turn depends on the complexity of the query and
+ // the system load. This makes it difficult to accurately estimate the
+ // memory usage at runtime. The following estimates assume that memory usage will
+ // lean towards the soft limits.
+ Preconditions.checkState(!children_.isEmpty());
+ Preconditions.checkNotNull(children_.get(0).getFragment());
+ int numSenders = children_.get(0).getFragment().getNumInstances(queryOptions.mt_dop);
+ long estimatedTotalQueueByteSize = estimateTotalQueueByteSize(numSenders);
+ long estimatedDeferredRPCQueueSize = estimateDeferredRPCQueueSize(queryOptions,
+ numSenders);
+ long estimatedMem = Math.max(
+ checkedAdd(estimatedTotalQueueByteSize, estimatedDeferredRPCQueueSize),
+ MIN_ESTIMATE_BYTES);
+ nodeResourceProfile_ = ResourceProfile.noReservation(estimatedMem);
+ }
+
+ // Returns the estimated size of the deferred batch queue (in bytes) by
+ // assuming that at least one row batch rpc payload per sender is queued.
+ private long estimateDeferredRPCQueueSize(TQueryOptions queryOptions,
+ int numSenders) {
+ long rowBatchSize = queryOptions.isSetBatch_size() && queryOptions.batch_size > 0
+ ? queryOptions.batch_size : DEFAULT_ROWBATCH_SIZE;
+ // Set an upper limit based on estimated cardinality.
+ if (getCardinality() > 0) rowBatchSize = Math.min(rowBatchSize, getCardinality());
+ long avgRowBatchByteSize = Math.min(
+ (long) Math.ceil(rowBatchSize * getAvgSerializedRowSize(this)),
+ ROWBATCH_MAX_MEM_USAGE);
+ long deferredBatchQueueSize = avgRowBatchByteSize * numSenders;
+ return deferredBatchQueueSize;
+ }
+
+ // Returns the total estimated size (in bytes) of the row batch queues by
+ // assuming enough batches can be queued such that it hits the row batch
+ // queue's soft mem limit.
+ private long estimateTotalQueueByteSize(int numSenders) {
+ int numQueues = isMergingExchange() ? numSenders : 1;
+ long maxQueueByteSize = BackendConfig.INSTANCE.getBackendCfg().
+ exchg_node_buffer_size_bytes;
+ // TODO: Should we set a better default size here? This might be alot for
+ // queries without stats.
+ long estimatedTotalQueueByteSize = numQueues * maxQueueByteSize;
+ // Set an upper limit based on estimated cardinality.
+ if (hasValidStats()) {
+ long totalBytesToReceive = (long) Math.ceil(getAvgRowSize() * getCardinality());
+ // Assuming no skew in distribution during data shuffling.
+ long bytesToReceivePerExchNode = isBroadcastExchange() ? totalBytesToReceive
+ : totalBytesToReceive / getNumNodes();
+ estimatedTotalQueueByteSize = Math.min(bytesToReceivePerExchNode,
+ estimatedTotalQueueByteSize);
+ }
+ return estimatedTotalQueueByteSize;
}
@Override
@@ -202,7 +277,7 @@ public class ExchangeNode extends PlanNode {
msg.exchange_node.addToInput_row_tuples(tid.asInt());
}
- if (mergeInfo_ != null) {
+ if (isMergingExchange()) {
TSortInfo sortInfo = new TSortInfo(
Expr.treesToThrift(mergeInfo_.getSortExprs()), mergeInfo_.getIsAscOrder(),
mergeInfo_.getNullsFirst());
http://git-wip-us.apache.org/repos/asf/impala/blob/8cbec20e/fe/src/main/java/org/apache/impala/planner/PlanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
index 927cc18..744aa09 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
@@ -68,6 +68,15 @@ import com.google.common.math.LongMath;
abstract public class PlanNode extends TreeNode<PlanNode> {
private final static Logger LOG = LoggerFactory.getLogger(PlanNode.class);
+ // The default row batch size used if the BATCH_SIZE query option is not set
+ // or is less than 1. Must be in sync with QueryState::DEFAULT_BATCH_SIZE.
+ protected final static int DEFAULT_ROWBATCH_SIZE = 1024;
+
+ // Max memory that a row batch can accumulate before it is considered at capacity.
+ // This is a soft capacity: row batches may exceed the capacity, preferably only by a
+ // row's worth of data. Must be in sync with RowBatch::AT_CAPACITY_MEM_USAGE.
+ protected final static int ROWBATCH_MAX_MEM_USAGE = 8 * 1024 * 1024;
+
// String used for this node in getExplainString().
protected String displayName_;
http://git-wip-us.apache.org/repos/asf/impala/blob/8cbec20e/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test
index 8a59968..6ae50f3 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test
@@ -12,12 +12,12 @@ Per-Host Resources: mem-estimate=1.88MB mem-reservation=0B thread-reservation=2
in pipelines: 00(GETNEXT)
---- DISTRIBUTEDPLAN
F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=1
+Per-Host Resources: mem-estimate=16.00KB mem-reservation=0B thread-reservation=1
PLAN-ROOT SINK
| mem-estimate=0B mem-reservation=0B thread-reservation=0
|
01:EXCHANGE [UNPARTITIONED]
- mem-estimate=0B mem-reservation=0B thread-reservation=0
+ mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
tuple-ids=0 row-size=124B cardinality=1
in pipelines: 00(GETNEXT)
@@ -47,12 +47,12 @@ Per-Host Resources: mem-estimate=3.75MB mem-reservation=0B thread-reservation=2
in pipelines: 00(GETNEXT)
---- DISTRIBUTEDPLAN
F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=1
+Per-Host Resources: mem-estimate=16.00KB mem-reservation=0B thread-reservation=1
PLAN-ROOT SINK
| mem-estimate=0B mem-reservation=0B thread-reservation=0
|
01:EXCHANGE [UNPARTITIONED]
- mem-estimate=0B mem-reservation=0B thread-reservation=0
+ mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
tuple-ids=0 row-size=124B cardinality=1
in pipelines: 00(GETNEXT)
@@ -81,12 +81,12 @@ Per-Host Resources: mem-estimate=3.75MB mem-reservation=0B thread-reservation=2
in pipelines: 00(GETNEXT)
---- DISTRIBUTEDPLAN
F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=1
+Per-Host Resources: mem-estimate=517.89KB mem-reservation=0B thread-reservation=1
PLAN-ROOT SINK
| mem-estimate=0B mem-reservation=0B thread-reservation=0
|
01:EXCHANGE [UNPARTITIONED]
- mem-estimate=0B mem-reservation=0B thread-reservation=0
+ mem-estimate=517.89KB mem-reservation=0B thread-reservation=0
tuple-ids=0 row-size=124B cardinality=3317
in pipelines: 00(GETNEXT)
@@ -114,12 +114,12 @@ Per-Host Resources: mem-estimate=3.75MB mem-reservation=0B thread-reservation=2
in pipelines: 00(GETNEXT)
---- DISTRIBUTEDPLAN
F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=1
+Per-Host Resources: mem-estimate=16.00KB mem-reservation=0B thread-reservation=1
PLAN-ROOT SINK
| mem-estimate=0B mem-reservation=0B thread-reservation=0
|
01:EXCHANGE [UNPARTITIONED]
- mem-estimate=0B mem-reservation=0B thread-reservation=0
+ mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
tuple-ids=0 row-size=124B cardinality=3
in pipelines: 00(GETNEXT)
http://git-wip-us.apache.org/repos/asf/impala/blob/8cbec20e/testdata/workloads/functional-planner/queries/PlannerTest/max-row-size.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/max-row-size.test b/testdata/workloads/functional-planner/queries/PlannerTest/max-row-size.test
index f82f1be..e1d8119 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/max-row-size.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/max-row-size.test
@@ -5,20 +5,20 @@ from tpch_parquet.customer
inner join tpch_parquet.nation on c_nationkey = n_nationkey
---- DISTRIBUTEDPLAN
Max Per-Host Resource Reservation: Memory=33.97MB Threads=5
-Per-Host Resource Estimates: Memory=58MB
+Per-Host Resource Estimates: Memory=68MB
F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-| Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=1
+| Per-Host Resources: mem-estimate=10.35MB mem-reservation=0B thread-reservation=1
PLAN-ROOT SINK
| mem-estimate=0B mem-reservation=0B thread-reservation=0
|
04:EXCHANGE [UNPARTITIONED]
-| mem-estimate=0B mem-reservation=0B thread-reservation=0
+| mem-estimate=10.35MB mem-reservation=0B thread-reservation=0
| tuple-ids=0,1 row-size=355B cardinality=150000
| in pipelines: 00(GETNEXT)
|
F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
-Per-Host Resources: mem-estimate=41.94MB mem-reservation=33.94MB thread-reservation=2 runtime-filters-memory=1.00MB
+Per-Host Resources: mem-estimate=41.95MB mem-reservation=33.94MB thread-reservation=2 runtime-filters-memory=1.00MB
02:HASH JOIN [INNER JOIN, BROADCAST]
| hash predicates: c_nationkey = n_nationkey
| fk/pk conjuncts: c_nationkey = n_nationkey
@@ -28,7 +28,7 @@ Per-Host Resources: mem-estimate=41.94MB mem-reservation=33.94MB thread-reservat
| in pipelines: 00(GETNEXT), 01(OPEN)
|
|--03:EXCHANGE [BROADCAST]
-| | mem-estimate=0B mem-reservation=0B thread-reservation=0
+| | mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
| | tuple-ids=1 row-size=117B cardinality=25
| | in pipelines: 01(GETNEXT)
| |
@@ -62,20 +62,20 @@ from tpch_parquet.lineitem
left join tpch_parquet.orders on l_orderkey = o_orderkey
---- DISTRIBUTEDPLAN
Max Per-Host Resource Reservation: Memory=110.00MB Threads=5
-Per-Host Resource Estimates: Memory=420MB
+Per-Host Resource Estimates: Memory=442MB
F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-| Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=1
+| Per-Host Resources: mem-estimate=11.35MB mem-reservation=0B thread-reservation=1
PLAN-ROOT SINK
| mem-estimate=0B mem-reservation=0B thread-reservation=0
|
04:EXCHANGE [UNPARTITIONED]
-| mem-estimate=0B mem-reservation=0B thread-reservation=0
+| mem-estimate=11.35MB mem-reservation=0B thread-reservation=0
| tuple-ids=0,1N row-size=454B cardinality=6001215
| in pipelines: 00(GETNEXT)
|
F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
-Per-Host Resources: mem-estimate=380.41MB mem-reservation=86.00MB thread-reservation=2
+Per-Host Resources: mem-estimate=390.80MB mem-reservation=86.00MB thread-reservation=2
02:HASH JOIN [LEFT OUTER JOIN, BROADCAST]
| hash predicates: l_orderkey = o_orderkey
| fk/pk conjuncts: l_orderkey = o_orderkey
@@ -84,7 +84,7 @@ Per-Host Resources: mem-estimate=380.41MB mem-reservation=86.00MB thread-reserva
| in pipelines: 00(GETNEXT), 01(OPEN)
|
|--03:EXCHANGE [BROADCAST]
-| | mem-estimate=0B mem-reservation=0B thread-reservation=0
+| | mem-estimate=10.38MB mem-reservation=0B thread-reservation=0
| | tuple-ids=1 row-size=191B cardinality=1500000
| | in pipelines: 01(GETNEXT)
| |
@@ -101,11 +101,11 @@ Per-Host Resources: mem-estimate=380.41MB mem-reservation=86.00MB thread-reserva
| in pipelines: 01(GETNEXT)
|
00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
- partitions=1/1 files=3 size=193.71MB
+ partitions=1/1 files=3 size=193.72MB
stored statistics:
- table: rows=6001215 size=193.71MB
+ table: rows=6001215 size=193.72MB
columns: all
- extrapolated-rows=disabled max-scan-range-rows=2141802
+ extrapolated-rows=disabled max-scan-range-rows=2141530
mem-estimate=80.00MB mem-reservation=40.00MB thread-reservation=1
tuple-ids=0 row-size=263B cardinality=6001215
in pipelines: 00(GETNEXT)
@@ -116,20 +116,20 @@ select * from tpch_parquet.lineitem
where l_orderkey not in (select o_orderkey from tpch_parquet.orders)
---- DISTRIBUTEDPLAN
Max Per-Host Resource Reservation: Memory=78.00MB Threads=5
-Per-Host Resource Estimates: Memory=154MB
+Per-Host Resource Estimates: Memory=175MB
F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-| Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=1
+| Per-Host Resources: mem-estimate=10.78MB mem-reservation=0B thread-reservation=1
PLAN-ROOT SINK
| mem-estimate=0B mem-reservation=0B thread-reservation=0
|
04:EXCHANGE [UNPARTITIONED]
-| mem-estimate=0B mem-reservation=0B thread-reservation=0
+| mem-estimate=10.78MB mem-reservation=0B thread-reservation=0
| tuple-ids=0 row-size=263B cardinality=6001215
| in pipelines: 00(GETNEXT)
|
F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
-Per-Host Resources: mem-estimate=114.00MB mem-reservation=74.00MB thread-reservation=2
+Per-Host Resources: mem-estimate=124.02MB mem-reservation=74.00MB thread-reservation=2
02:HASH JOIN [NULL AWARE LEFT ANTI JOIN, BROADCAST]
| hash predicates: l_orderkey = o_orderkey
| mem-estimate=34.00MB mem-reservation=34.00MB spill-buffer=1.00MB thread-reservation=0
@@ -137,7 +137,7 @@ Per-Host Resources: mem-estimate=114.00MB mem-reservation=74.00MB thread-reserva
| in pipelines: 00(GETNEXT), 01(OPEN)
|
|--03:EXCHANGE [BROADCAST]
-| | mem-estimate=0B mem-reservation=0B thread-reservation=0
+| | mem-estimate=10.02MB mem-reservation=0B thread-reservation=0
| | tuple-ids=1 row-size=8B cardinality=1500000
| | in pipelines: 01(GETNEXT)
| |
@@ -154,11 +154,11 @@ Per-Host Resources: mem-estimate=114.00MB mem-reservation=74.00MB thread-reserva
| in pipelines: 01(GETNEXT)
|
00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
- partitions=1/1 files=3 size=193.71MB
+ partitions=1/1 files=3 size=193.72MB
stored statistics:
- table: rows=6001215 size=193.71MB
+ table: rows=6001215 size=193.72MB
columns: all
- extrapolated-rows=disabled max-scan-range-rows=2141802
+ extrapolated-rows=disabled max-scan-range-rows=2141530
mem-estimate=80.00MB mem-reservation=40.00MB thread-reservation=1
tuple-ids=0 row-size=263B cardinality=6001215
in pipelines: 00(GETNEXT)
@@ -172,20 +172,20 @@ group by 1, 2
having count(*) = 1
---- DISTRIBUTEDPLAN
Max Per-Host Resource Reservation: Memory=125.00MB Threads=7
-Per-Host Resource Estimates: Memory=253MB
+Per-Host Resource Estimates: Memory=293MB
F04:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-| Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=1
+| Per-Host Resources: mem-estimate=10.11MB mem-reservation=0B thread-reservation=1
PLAN-ROOT SINK
| mem-estimate=0B mem-reservation=0B thread-reservation=0
|
08:EXCHANGE [UNPARTITIONED]
-| mem-estimate=0B mem-reservation=0B thread-reservation=0
+| mem-estimate=10.11MB mem-reservation=0B thread-reservation=0
| tuple-ids=2 row-size=33B cardinality=4690314
| in pipelines: 07(GETNEXT)
|
F03:PLAN FRAGMENT [HASH(l_orderkey,o_orderstatus)] hosts=3 instances=3
-Per-Host Resources: mem-estimate=46.00MB mem-reservation=46.00MB thread-reservation=1
+Per-Host Resources: mem-estimate=56.11MB mem-reservation=46.00MB thread-reservation=1
07:AGGREGATE [FINALIZE]
| output: count:merge(*)
| group by: l_orderkey, o_orderstatus
@@ -195,12 +195,12 @@ Per-Host Resources: mem-estimate=46.00MB mem-reservation=46.00MB thread-reservat
| in pipelines: 07(GETNEXT), 00(OPEN)
|
06:EXCHANGE [HASH(l_orderkey,o_orderstatus)]
-| mem-estimate=0B mem-reservation=0B thread-reservation=0
+| mem-estimate=10.11MB mem-reservation=0B thread-reservation=0
| tuple-ids=2 row-size=33B cardinality=4690314
| in pipelines: 00(GETNEXT)
|
F02:PLAN FRAGMENT [HASH(l_orderkey)] hosts=3 instances=3
-Per-Host Resources: mem-estimate=86.12MB mem-reservation=66.00MB thread-reservation=1 runtime-filters-memory=1.00MB
+Per-Host Resources: mem-estimate=106.22MB mem-reservation=66.00MB thread-reservation=1 runtime-filters-memory=1.00MB
03:AGGREGATE [STREAMING]
| output: count(*)
| group by: l_orderkey, o_orderstatus
@@ -217,7 +217,7 @@ Per-Host Resources: mem-estimate=86.12MB mem-reservation=66.00MB thread-reservat
| in pipelines: 00(GETNEXT), 01(OPEN)
|
|--05:EXCHANGE [HASH(o_orderkey)]
-| | mem-estimate=0B mem-reservation=0B thread-reservation=0
+| | mem-estimate=10.06MB mem-reservation=0B thread-reservation=0
| | tuple-ids=1 row-size=25B cardinality=1500000
| | in pipelines: 01(GETNEXT)
| |
@@ -234,19 +234,19 @@ Per-Host Resources: mem-estimate=86.12MB mem-reservation=66.00MB thread-reservat
| in pipelines: 01(GETNEXT)
|
04:EXCHANGE [HASH(l_orderkey)]
-| mem-estimate=0B mem-reservation=0B thread-reservation=0
+| mem-estimate=10.04MB mem-reservation=0B thread-reservation=0
| tuple-ids=0 row-size=8B cardinality=6001215
| in pipelines: 00(GETNEXT)
|
F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
Per-Host Resources: mem-estimate=81.00MB mem-reservation=5.00MB thread-reservation=2 runtime-filters-memory=1.00MB
00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
- partitions=1/1 files=3 size=193.71MB
+ partitions=1/1 files=3 size=193.72MB
runtime filters: RF000[bloom] -> l_orderkey
stored statistics:
- table: rows=6001215 size=193.71MB
+ table: rows=6001215 size=193.72MB
columns: all
- extrapolated-rows=disabled max-scan-range-rows=2141802
+ extrapolated-rows=disabled max-scan-range-rows=2141530
mem-estimate=80.00MB mem-reservation=4.00MB thread-reservation=1
tuple-ids=0 row-size=8B cardinality=6001215
in pipelines: 00(GETNEXT)
@@ -257,20 +257,20 @@ select distinct *
from tpch_parquet.lineitem
---- DISTRIBUTEDPLAN
Max Per-Host Resource Reservation: Memory=120.00MB Threads=4
-Per-Host Resource Estimates: Memory=3.31GB
+Per-Host Resource Estimates: Memory=3.33GB
F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-| Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=1
+| Per-Host Resources: mem-estimate=10.78MB mem-reservation=0B thread-reservation=1
PLAN-ROOT SINK
| mem-estimate=0B mem-reservation=0B thread-reservation=0
|
04:EXCHANGE [UNPARTITIONED]
-| mem-estimate=0B mem-reservation=0B thread-reservation=0
+| mem-estimate=10.78MB mem-reservation=0B thread-reservation=0
| tuple-ids=1 row-size=263B cardinality=6001215
| in pipelines: 03(GETNEXT)
|
F01:PLAN FRAGMENT [HASH(tpch_parquet.lineitem.l_orderkey,tpch_parquet.lineitem.l_partkey,tpch_parquet.lineitem.l_suppkey,tpch_parquet.lineitem.l_linenumber,tpch_parquet.lineitem.l_quantity,tpch_parquet.lineitem.l_extendedprice,tpch_parquet.lineitem.l_discount,tpch_parquet.lineitem.l_tax,tpch_parquet.lineitem.l_returnflag,tpch_parquet.lineitem.l_linestatus,tpch_parquet.lineitem.l_shipdate,tpch_parquet.lineitem.l_commitdate,tpch_parquet.lineitem.l_receiptdate,tpch_parquet.lineitem.l_shipinstruct,tpch_parquet.lineitem.l_shipmode,tpch_parquet.lineitem.l_comment)] hosts=3 instances=3
-Per-Host Resources: mem-estimate=1.62GB mem-reservation=46.00MB thread-reservation=1
+Per-Host Resources: mem-estimate=1.63GB mem-reservation=46.00MB thread-reservation=1
03:AGGREGATE [FINALIZE]
| group by: tpch_parquet.lineitem.l_orderkey, tpch_parquet.lineitem.l_partkey, tpch_parquet.lineitem.l_suppkey, tpch_parquet.lineitem.l_linenumber, tpch_parquet.lineitem.l_quantity, tpch_parquet.lineitem.l_extendedprice, tpch_parquet.lineitem.l_discount, tpch_parquet.lineitem.l_tax, tpch_parquet.lineitem.l_returnflag, tpch_parquet.lineitem.l_linestatus, tpch_parquet.lineitem.l_shipdate, tpch_parquet.lineitem.l_commitdate, tpch_parquet.lineitem.l_receiptdate, tpch_parquet.lineitem.l_shipinstruct, tpch_parquet.lineitem.l_shipmode, tpch_parquet.lineitem.l_comment
| mem-estimate=1.62GB mem-reservation=46.00MB spill-buffer=2.00MB thread-reservation=0
@@ -278,7 +278,7 @@ Per-Host Resources: mem-estimate=1.62GB mem-reservation=46.00MB thread-reservati
| in pipelines: 03(GETNEXT), 00(OPEN)
|
02:EXCHANGE [HASH(tpch_parquet.lineitem.l_orderkey,tpch_parquet.lineitem.l_partkey,tpch_parquet.lineitem.l_suppkey,tpch_parquet.lineitem.l_linenumber,tpch_parquet.lineitem.l_quantity,tpch_parquet.lineitem.l_extendedprice,tpch_parquet.lineitem.l_discount,tpch_parquet.lineitem.l_tax,tpch_parquet.lineitem.l_returnflag,tpch_parquet.lineitem.l_linestatus,tpch_parquet.lineitem.l_shipdate,tpch_parquet.lineitem.l_commitdate,tpch_parquet.lineitem.l_receiptdate,tpch_parquet.lineitem.l_shipinstruct,tpch_parquet.lineitem.l_shipmode,tpch_parquet.lineitem.l_comment)]
-| mem-estimate=0B mem-reservation=0B thread-reservation=0
+| mem-estimate=10.78MB mem-reservation=0B thread-reservation=0
| tuple-ids=1 row-size=263B cardinality=6001215
| in pipelines: 00(GETNEXT)
|
@@ -291,11 +291,11 @@ Per-Host Resources: mem-estimate=1.69GB mem-reservation=74.00MB thread-reservati
| in pipelines: 00(GETNEXT)
|
00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
- partitions=1/1 files=3 size=193.71MB
+ partitions=1/1 files=3 size=193.72MB
stored statistics:
- table: rows=6001215 size=193.71MB
+ table: rows=6001215 size=193.72MB
columns: all
- extrapolated-rows=disabled max-scan-range-rows=2141802
+ extrapolated-rows=disabled max-scan-range-rows=2141530
mem-estimate=80.00MB mem-reservation=40.00MB thread-reservation=1
tuple-ids=0 row-size=263B cardinality=6001215
in pipelines: 00(GETNEXT)
@@ -307,20 +307,20 @@ from tpch_parquet.lineitem
group by 1, 2
---- DISTRIBUTEDPLAN
Max Per-Host Resource Reservation: Memory=98.00MB Threads=4
-Per-Host Resource Estimates: Memory=483MB
+Per-Host Resource Estimates: Memory=503MB
F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-| Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=1
+| Per-Host Resources: mem-estimate=10.11MB mem-reservation=0B thread-reservation=1
PLAN-ROOT SINK
| mem-estimate=0B mem-reservation=0B thread-reservation=0
|
04:EXCHANGE [UNPARTITIONED]
-| mem-estimate=0B mem-reservation=0B thread-reservation=0
+| mem-estimate=10.11MB mem-reservation=0B thread-reservation=0
| tuple-ids=1 row-size=32B cardinality=6001215
| in pipelines: 03(GETNEXT)
|
F01:PLAN FRAGMENT [HASH(l_orderkey,l_partkey)] hosts=3 instances=3
-Per-Host Resources: mem-estimate=201.46MB mem-reservation=48.00MB thread-reservation=1
+Per-Host Resources: mem-estimate=211.56MB mem-reservation=48.00MB thread-reservation=1
03:AGGREGATE [FINALIZE]
| output: group_concat:merge(l_linestatus, ',')
| group by: l_orderkey, l_partkey
@@ -329,7 +329,7 @@ Per-Host Resources: mem-estimate=201.46MB mem-reservation=48.00MB thread-reserva
| in pipelines: 03(GETNEXT), 00(OPEN)
|
02:EXCHANGE [HASH(l_orderkey,l_partkey)]
-| mem-estimate=0B mem-reservation=0B thread-reservation=0
+| mem-estimate=10.11MB mem-reservation=0B thread-reservation=0
| tuple-ids=1 row-size=32B cardinality=6001215
| in pipelines: 00(GETNEXT)
|
@@ -343,11 +343,11 @@ Per-Host Resources: mem-estimate=281.46MB mem-reservation=50.00MB thread-reserva
| in pipelines: 00(GETNEXT)
|
00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
- partitions=1/1 files=3 size=193.71MB
+ partitions=1/1 files=3 size=193.72MB
stored statistics:
- table: rows=6001215 size=193.71MB
+ table: rows=6001215 size=193.72MB
columns: all
- extrapolated-rows=disabled max-scan-range-rows=2141802
+ extrapolated-rows=disabled max-scan-range-rows=2141530
mem-estimate=80.00MB mem-reservation=16.00MB thread-reservation=1
tuple-ids=0 row-size=33B cardinality=6001215
in pipelines: 00(GETNEXT)
@@ -362,17 +362,17 @@ Per-Host Resource Estimates: Memory=56MB
Codegen disabled by planner
F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-| Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=1
+| Per-Host Resources: mem-estimate=56.26KB mem-reservation=0B thread-reservation=1
PLAN-ROOT SINK
| mem-estimate=0B mem-reservation=0B thread-reservation=0
|
04:EXCHANGE [UNPARTITIONED]
-| mem-estimate=0B mem-reservation=0B thread-reservation=0
+| mem-estimate=56.26KB mem-reservation=0B thread-reservation=0
| tuple-ids=3,2 row-size=6B cardinality=7300
| in pipelines: 01(GETNEXT)
|
F01:PLAN FRAGMENT [HASH(int_col)] hosts=3 instances=3
-Per-Host Resources: mem-estimate=40.00MB mem-reservation=40.00MB thread-reservation=1
+Per-Host Resources: mem-estimate=40.04MB mem-reservation=40.00MB thread-reservation=1
02:ANALYTIC
| functions: max(tinyint_col)
| partition by: int_col
@@ -387,7 +387,7 @@ Per-Host Resources: mem-estimate=40.00MB mem-reservation=40.00MB thread-reservat
| in pipelines: 01(GETNEXT), 00(OPEN)
|
03:EXCHANGE [HASH(int_col)]
-| mem-estimate=0B mem-reservation=0B thread-reservation=0
+| mem-estimate=38.88KB mem-reservation=0B thread-reservation=0
| tuple-ids=0 row-size=5B cardinality=7300
| in pipelines: 00(GETNEXT)
|