You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by cs...@apache.org on 2023/03/09 14:13:24 UTC
[impala] 02/07: IMPALA-11604 (part 1): Model ProcessingCost for PlanNodes & DataSink
This is an automated email from the ASF dual-hosted git repository.
csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 29ad046d05869bed7489bc487636e0f64b3328aa
Author: Qifan Chen <qc...@cloudera.com>
AuthorDate: Thu Sep 22 16:36:34 2022 -0400
IMPALA-11604 (part 1): Model ProcessingCost for PlanNodes & DataSink
This patch augments IMPALA-10992 by establishing a model to allow the
weighted total amount of data to process to be used as a new factor in
the definition and selection of an executor group. We call this model
ProcessingCost.
ProcessingCost of a PlanNode/DataSink is a weighted amount of data
processed by that node/sink. The basic ProcessingCost is computed with a
general formula as follows.
ProcessingCost is a pair: PC(D, N), where D = I * (C + M)
where D is the weighted amount of data processed
I is the input cardinality
C is the expression evaluation cost per row.
Set to total weight of expression evaluation in node/sink.
M is a materialization cost per row.
Only used by scan and exchange node. Otherwise, 0.
N is the number of instances.
Default to D / 10000000.
In this patch, the weight of each expression evaluation is set to a
constant of 1. A description of the computation for each kind of
PlanNode/DataSink is given below.
01. AggregationNode:
Each AggregateInfo has its C as a sum of grouping expression and
aggregate expression and then assigned a single ProcessingCost
individually. These ProcessingCosts then summed to be the Aggregation
node's ProcessingCost;
02. AnalyticEvalNode:
C is the sum of the evaluation costs for analytic functions;
03. CardinalityCheckNode:
Use the general formula, I = 1;
04. DataSourceScanNode:
Follow the formula from the superclass ScanNode;
05. EmptySetNode:
I = 0;
06. ExchangeNode:
M = (average serialized row size) / 1024
A modification of the general formula when in broadcast mode:
D = D * number of receivers;
07. HashJoinNode:
probe cost = PC(I0 * C(equiJoin predicate), N) +
PC(output cardinality * C(otherJoin predicate), N)
build cost = PC(I1 * C(equi-join predicate), N)
With I0 and I1 as input cardinality of the probe and build side
accordingly. If the plan node does not have a separate build, ProcessingCost
is the sum of probe cost and build cost. Otherwise, ProcessingCost is
equal to probeCost.
08. HbaseScanNode, HdfsScanNode, and KuduScanNode:
Follow the formula from the superclass ScanNode;
09. Nested loop join node:
When the right child is not a SingularRowSrcNode:
probe cost = PC(I0 * C(equiJoin predicate), N) +
PC(output cardinality * C(otherJoin predicate), N)
build cost = PC(I1 * C(equiJoin predicate), N)
When the right child is a SingularRowSrcNode:
probe cost = PC(I0, N)
build cost = PC(I0 * I1, N)
With I0 and I1 as input cardinality of the probe and build side
accordingly. If the plan node does not have a separate build, ProcessingCost
is the sum of probe cost and build cost. Otherwise, ProcessingCost is
equal to probeCost.
10. ScanNode:
M = (average row size) / 1024;
11. SelectNode:
Use the general formula;
12. SingularRowSrcNode:
Since the node is involved once per input in nested loop join, the
contribution of this node is computed in nested loop join;
13. SortNode:
C is the evaluation cost for the sort expression;
14. SubplanNode:
C is 1. I is the multiplication of the cardinality of the left and
the right child;
15. Union node:
C is the cost of result expression evaluation from all non-pass-through
children;
16. Unnest node:
I is the cardinality of the containing SubplanNode and C is 1.
17. DataStreamSink:
M = 1 / num rows per batch.
18. JoinBuildSink:
ProcessingCost is the build cost of its associated JoinNode.
19. PlanRootSink:
If result spooling is enabled, C is the cost of output expression
evaluation. Otherwise. ProcessingCost is zero.
20. TableSink:
C is the cost of output expression evaluation.
TableSink subclasses (including HBaseTableSink, HdfsTableSink, and
KuduTableSink) follows the same formula;
Part 2 of IMPALA-11604 will implement an algorithm that tries to adjust
the number of instances for each fragment by considering their
production-consumption ratio, and then finally returns a number
representing an ideal CPU core count required for a query to run
efficiently.
Testing:
- Pass FE tests.
Co-authored-by: Riza Suminto <ri...@cloudera.com>
Change-Id: If32dc770dfffcdd0be2b5555a789a7720952c68a
Reviewed-on: http://gerrit.cloudera.org:8080/19033
Reviewed-by: Wenzhe Zhou <wz...@cloudera.com>
Reviewed-by: Kurt Deschler <kd...@cloudera.com>
Reviewed-by: Riza Suminto <ri...@cloudera.com>
Tested-by: Riza Suminto <ri...@cloudera.com>
---
.../org/apache/impala/analysis/AggregateInfo.java | 9 +
.../java/org/apache/impala/analysis/SortInfo.java | 8 +
.../org/apache/impala/planner/AggregationNode.java | 11 +
.../apache/impala/planner/AnalyticEvalNode.java | 12 +
.../apache/impala/planner/BaseProcessingCost.java | 69 +++++
.../impala/planner/BroadcastProcessingCost.java | 74 +++++
.../impala/planner/CardinalityCheckNode.java | 5 +
.../java/org/apache/impala/planner/DataSink.java | 33 ++-
.../apache/impala/planner/DataSourceScanNode.java | 5 +
.../org/apache/impala/planner/DataStreamSink.java | 12 +-
.../org/apache/impala/planner/EmptySetNode.java | 10 +
.../org/apache/impala/planner/ExchangeNode.java | 54 +++-
.../org/apache/impala/planner/HBaseScanNode.java | 6 +-
.../org/apache/impala/planner/HBaseTableSink.java | 6 +
.../org/apache/impala/planner/HashJoinNode.java | 27 ++
.../org/apache/impala/planner/HdfsScanNode.java | 5 +
.../org/apache/impala/planner/HdfsTableSink.java | 8 +-
.../org/apache/impala/planner/JoinBuildSink.java | 8 +
.../java/org/apache/impala/planner/JoinNode.java | 21 ++
.../org/apache/impala/planner/KuduScanNode.java | 5 +
.../org/apache/impala/planner/KuduTableSink.java | 7 +-
.../apache/impala/planner/NestedLoopJoinNode.java | 47 ++++
.../java/org/apache/impala/planner/PlanNode.java | 71 ++++-
.../org/apache/impala/planner/PlanRootSink.java | 14 +
.../org/apache/impala/planner/ProcessingCost.java | 306 +++++++++++++++++++++
.../impala/planner/ScaledProcessingCost.java | 65 +++++
.../java/org/apache/impala/planner/ScanNode.java | 19 ++
.../java/org/apache/impala/planner/SelectNode.java | 5 +
.../apache/impala/planner/SingularRowSrcNode.java | 8 +
.../java/org/apache/impala/planner/SortNode.java | 6 +
.../org/apache/impala/planner/SubplanNode.java | 5 +
.../apache/impala/planner/SumProcessingCost.java | 61 ++++
.../java/org/apache/impala/planner/TableSink.java | 8 +-
.../java/org/apache/impala/planner/UnionNode.java | 21 ++
.../java/org/apache/impala/planner/UnnestNode.java | 7 +-
.../main/java/org/apache/impala/util/ExprUtil.java | 19 ++
36 files changed, 1033 insertions(+), 24 deletions(-)
diff --git a/fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java b/fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java
index 865d1d15b..324d64829 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java
@@ -25,6 +25,8 @@ import org.apache.impala.catalog.AggregateFunction;
import org.apache.impala.catalog.Type;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.InternalException;
+import org.apache.impala.planner.ProcessingCost;
+import org.apache.impala.util.ExprUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -717,4 +719,11 @@ public class AggregateInfo extends AggregateInfoBase {
@Override
public AggregateInfo clone() { return new AggregateInfo(this); }
+
+ public ProcessingCost computeProcessingCost(String label, long inputCardinality) {
+ float weight = ExprUtil.computeExprsTotalCost(getGroupingExprs())
+ + ExprUtil.computeExprsTotalCost(getAggregateExprs());
+
+ return ProcessingCost.basicCost(label, inputCardinality, weight);
+ }
}
diff --git a/fe/src/main/java/org/apache/impala/analysis/SortInfo.java b/fe/src/main/java/org/apache/impala/analysis/SortInfo.java
index 995c686eb..e6d802ede 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SortInfo.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SortInfo.java
@@ -26,7 +26,9 @@ import java.util.Set;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.TreeNode;
import org.apache.impala.planner.PlanNode;
+import org.apache.impala.planner.ProcessingCost;
import org.apache.impala.thrift.TSortingOrder;
+import org.apache.impala.util.ExprUtil;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
@@ -313,4 +315,10 @@ public class SortInfo {
}
return result;
}
+
+ public ProcessingCost computeProcessingCost(String label, long inputCardinality) {
+ float weight = ExprUtil.computeExprsTotalCost(getSortExprs());
+
+ return ProcessingCost.basicCost(label, inputCardinality, weight);
+ }
}
diff --git a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
index b9741ff7b..e4eea634c 100644
--- a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
@@ -77,6 +77,7 @@ public class AggregationNode extends PlanNode {
private boolean useStreamingPreagg_ = false;
// Resource profiles for each aggregation class.
+ // Set in computeNodeResourceProfile().
private List<ResourceProfile> resourceProfiles_;
// Conservative minimum size of hash table for low-cardinality aggregations.
@@ -505,6 +506,16 @@ public class AggregationNode extends PlanNode {
return output;
}
+ @Override
+ public void computeProcessingCost(TQueryOptions queryOptions) {
+ processingCost_ = ProcessingCost.zero();
+ for (AggregateInfo aggInfo : aggInfos_) {
+ ProcessingCost aggCost =
+ aggInfo.computeProcessingCost(getDisplayLabel(), getChild(0).getCardinality());
+ processingCost_ = ProcessingCost.sumCost(processingCost_, aggCost);
+ }
+ }
+
@Override
public void computeNodeResourceProfile(TQueryOptions queryOptions) {
resourceProfiles_ = Lists.newArrayListWithCapacity(aggInfos_.size());
diff --git a/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java b/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
index d4fb6abf4..2ab23a643 100644
--- a/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
@@ -45,6 +45,7 @@ import org.apache.impala.thrift.TExplainLevel;
import org.apache.impala.thrift.TPlanNode;
import org.apache.impala.thrift.TPlanNodeType;
import org.apache.impala.thrift.TQueryOptions;
+import org.apache.impala.util.ExprUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -357,6 +358,17 @@ public class AnalyticEvalNode extends PlanNode {
return output.toString();
}
+ @Override
+ public void computeProcessingCost(TQueryOptions queryOptions) {
+ // The total cost per row is the sum of the evaluation costs for analytic functions,
+ // partition by equal and order by equal predicate. 'partitionByEq_' and 'orderByEq_'
+ // are excluded since the input data stream is already partitioned and sorted within
+ // each partition (see notes on class AnalyticEvalNode in analytic-eval-node.h).
+ float totalCostToEvalOneRow = ExprUtil.computeExprsTotalCost(analyticFnCalls_);
+ processingCost_ = ProcessingCost.basicCost(
+ getDisplayLabel(), getCardinality(), totalCostToEvalOneRow);
+ }
+
@Override
public void computeNodeResourceProfile(TQueryOptions queryOptions) {
Preconditions.checkNotNull(
diff --git a/fe/src/main/java/org/apache/impala/planner/BaseProcessingCost.java b/fe/src/main/java/org/apache/impala/planner/BaseProcessingCost.java
new file mode 100644
index 000000000..3f6de5ffd
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/planner/BaseProcessingCost.java
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.planner;
+
+/**
+ * A basic implementation of {@link ProcessingCost} that takes account expression cost
+ * and average row size as per-row costing weight.
+ */
+public class BaseProcessingCost extends ProcessingCost {
+ private final long cardinality_;
+ private final float exprsCost_;
+ private final float materializationCost_;
+
+ public BaseProcessingCost(
+ long cardinality, float exprsCost, float materializationCost) {
+ // TODO: materializationCost accommodate ProcessingCost where row width should be
+ // factor in. Currently, ProcessingCost of ScanNode, ExchangeNode, and DataStreamSink
+ // has row width factored in through materialization parameter here. Investigate if
+ // other operator need to have its row width factored in as well and whether we should
+ // have specific 'rowWidth' parameter here.
+ cardinality_ = cardinality;
+ exprsCost_ = exprsCost;
+ materializationCost_ = materializationCost;
+ }
+
+ private float costFactor() { return exprsCost_ + materializationCost_; }
+
+ @Override
+ public long getTotalCost() {
+ // Total cost must be non-negative.
+ return (long) Math.ceil(Math.max(cardinality_, 0) * costFactor());
+ }
+
+ @Override
+ public boolean isValid() {
+ return cardinality_ >= 0;
+ }
+
+ @Override
+ public ProcessingCost clone() {
+ return new BaseProcessingCost(cardinality_, exprsCost_, materializationCost_);
+ }
+
+ @Override
+ public String getDetails() {
+ StringBuilder output = new StringBuilder();
+ output.append(super.getDetails());
+ output.append(" cardinality=")
+ .append(cardinality_)
+ .append(" cost-factor=")
+ .append(costFactor());
+ return output.toString();
+ }
+}
diff --git a/fe/src/main/java/org/apache/impala/planner/BroadcastProcessingCost.java b/fe/src/main/java/org/apache/impala/planner/BroadcastProcessingCost.java
new file mode 100644
index 000000000..d031ecea6
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/planner/BroadcastProcessingCost.java
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.planner;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.impala.util.MathUtil;
+
+import java.util.function.Supplier;
+
+/**
+ * Similar as {@link ScaledProcessingCost}, but the multiple (countSupplier) represent
+ * fragment instance count associated with this ProcessingCost and may change after the
+ * object construction.
+ * <p>
+ * countSupplier must always return positive value.
+ */
+public class BroadcastProcessingCost extends ProcessingCost {
+ private final ProcessingCost childProcessingCost_;
+
+ protected BroadcastProcessingCost(
+ ProcessingCost cost, Supplier<Integer> countSupplier) {
+ Preconditions.checkArgument(
+ cost.isValid(), "BroadcastProcessingCost: cost is invalid!");
+ childProcessingCost_ = cost;
+ setNumInstanceExpected(countSupplier);
+ }
+
+ @Override
+ public long getTotalCost() {
+ return MathUtil.saturatingMultiply(
+ childProcessingCost_.getTotalCost(), getNumInstancesExpected());
+ }
+
+ @Override
+ public boolean isValid() {
+ return getNumInstancesExpected() > 0;
+ }
+
+ @Override
+ public ProcessingCost clone() {
+ return new BroadcastProcessingCost(childProcessingCost_, numInstanceSupplier_);
+ }
+
+ @Override
+ public String getExplainString(String detailPrefix, boolean fullExplain) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(detailPrefix);
+ sb.append("BroadcastCost(");
+ sb.append(getNumInstancesExpected());
+ sb.append("): ");
+ sb.append(getDetails());
+ if (fullExplain) {
+ sb.append("\n");
+ sb.append(childProcessingCost_.getExplainString(detailPrefix + " ", true));
+ }
+ return sb.toString();
+ }
+}
diff --git a/fe/src/main/java/org/apache/impala/planner/CardinalityCheckNode.java b/fe/src/main/java/org/apache/impala/planner/CardinalityCheckNode.java
index 00cd67fdd..78d32d8ac 100644
--- a/fe/src/main/java/org/apache/impala/planner/CardinalityCheckNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/CardinalityCheckNode.java
@@ -78,6 +78,11 @@ public class CardinalityCheckNode extends PlanNode {
msg.setCardinality_check_node(cardinalityCheckNode);
}
+ @Override
+ public void computeProcessingCost(TQueryOptions queryOptions) {
+ processingCost_ = computeDefaultProcessingCost();
+ }
+
@Override
public void computeNodeResourceProfile(TQueryOptions queryOptions) {
nodeResourceProfile_ = ResourceProfile.noReservation(0);
diff --git a/fe/src/main/java/org/apache/impala/planner/DataSink.java b/fe/src/main/java/org/apache/impala/planner/DataSink.java
index 2cacb4d3d..4b665b4c4 100644
--- a/fe/src/main/java/org/apache/impala/planner/DataSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/DataSink.java
@@ -45,6 +45,10 @@ public abstract class DataSink {
// set in computeResourceProfile()
protected ResourceProfile resourceProfile_ = ResourceProfile.invalid();
+ // A total processing cost across all instances of this plan node.
+ // Set in computeProcessingCost() for a meaningful value.
+ protected ProcessingCost processingCost_ = ProcessingCost.invalid();
+
/**
* Return an explain string for the DataSink. Each line of the explain will be prefixed
* by "prefix".
@@ -56,6 +60,19 @@ public abstract class DataSink {
if (explainLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
output.append(detailPrefix);
output.append(resourceProfile_.getExplainString());
+ if (ProcessingCost.isComputeCost(queryOptions)) {
+ // Show processing cost total.
+ output.append(" cost=");
+ if (processingCost_.isValid()) {
+ output.append(processingCost_.getTotalCost());
+ if (explainLevel.ordinal() >= TExplainLevel.VERBOSE.ordinal()) {
+ output.append("\n");
+ output.append(processingCost_.getExplainString(detailPrefix, false));
+ }
+ } else {
+ output.append("<invalid>");
+ }
+ }
output.append("\n");
}
return output.toString();
@@ -107,15 +124,29 @@ public abstract class DataSink {
public void setFragment(PlanFragment fragment) { fragment_ = fragment; }
public PlanFragment getFragment() { return fragment_; }
public ResourceProfile getResourceProfile() { return resourceProfile_; }
+ public ProcessingCost getProcessingCost() { return processingCost_; }
+
+ public abstract void computeProcessingCost(TQueryOptions queryOptions);
/**
* Compute the resource profile for an instance of this DataSink.
*/
public abstract void computeResourceProfile(TQueryOptions queryOptions);
+ /**
+ * Set number of rows consumed and produced data fields in processing cost.
+ */
+ public void computeRowConsumptionAndProductionToCost() {
+ Preconditions.checkState(processingCost_.isValid(),
+ "Processing cost of DataSink " + fragment_.getId() + ":" + getLabel()
+ + " is invalid!");
+ long inputOutputCardinality = fragment_.getPlanRoot().getCardinality();
+ processingCost_.setNumRowToConsume(inputOutputCardinality);
+ processingCost_.setNumRowToProduce(inputOutputCardinality);
+ }
+
/**
* Collect all expressions evaluated by this data sink.
*/
public abstract void collectExprs(List<Expr> exprs);
-
}
diff --git a/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java b/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
index c89b2760f..7cafa1840 100644
--- a/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
@@ -332,6 +332,11 @@ public class DataSourceScanNode extends ScanNode {
new TScanRange(), Lists.newArrayList(new TScanRangeLocation(hostIndex))));
}
+ @Override
+ public void computeProcessingCost(TQueryOptions queryOptions) {
+ processingCost_ = computeDefaultProcessingCost();
+ }
+
@Override
public void computeNodeResourceProfile(TQueryOptions queryOptions) {
// This node fetches a thrift representation of the rows from the data
diff --git a/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java b/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java
index 06b8f31b0..fcf1f266e 100644
--- a/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java
@@ -71,9 +71,7 @@ public class DataStreamSink extends DataSink {
private long estimateOutboundRowBatchBuffers(TQueryOptions queryOptions) {
int numChannels =
outputPartition_.isPartitioned() ? exchNode_.getFragment().getNumInstances() : 1;
- long rowBatchSize = queryOptions.isSetBatch_size() && queryOptions.batch_size > 0 ?
- queryOptions.batch_size :
- PlanNode.DEFAULT_ROWBATCH_SIZE;
+ long rowBatchSize = PlanNode.getRowBatchSize(queryOptions);
long avgOutboundRowBatchSize = Math.min(
(long) Math.ceil(rowBatchSize * exchNode_.getAvgSerializedRowSize(exchNode_)),
PlanNode.ROWBATCH_MAX_MEM_USAGE);
@@ -86,6 +84,14 @@ public class DataStreamSink extends DataSink {
return bufferSize;
}
+ @Override
+ public void computeProcessingCost(TQueryOptions queryOptions) {
+ // The sending part of the processing cost for the exchange node.
+ processingCost_ =
+ ProcessingCost.basicCost(getLabel() + "(" + exchNode_.getDisplayLabel() + ")",
+ exchNode_.getCardinality(), 0, exchNode_.estimateSerializationCostPerRow());
+ }
+
@Override
public void computeResourceProfile(TQueryOptions queryOptions) {
long estimatedMem = estimateOutboundRowBatchBuffers(queryOptions);
diff --git a/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java b/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java
index b39a2ae15..02bdcff1f 100644
--- a/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java
@@ -59,6 +59,11 @@ public class EmptySetNode extends PlanNode {
computeStats(analyzer);
}
+ @Override
+ public void computeProcessingCost(TQueryOptions queryOptions) {
+ processingCost_ = ProcessingCost.zero();
+ }
+
@Override
public void computeNodeResourceProfile(TQueryOptions queryOptions) {
// TODO: add an estimate
@@ -78,4 +83,9 @@ public class EmptySetNode extends PlanNode {
@Override
protected boolean displayCardinality(TExplainLevel detailLevel) { return false; }
+
+ @Override
+ protected boolean isLeafNode() {
+ return true;
+ }
}
diff --git a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
index c3856956f..44232b53d 100644
--- a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
@@ -29,6 +29,7 @@ import org.apache.impala.thrift.TPlanNode;
import org.apache.impala.thrift.TPlanNodeType;
import org.apache.impala.thrift.TQueryOptions;
import org.apache.impala.thrift.TSortInfo;
+import org.apache.impala.util.ExprUtil;
import com.google.common.base.Preconditions;
@@ -66,7 +67,7 @@ public class ExchangeNode extends PlanNode {
return mergeInfo_ != null;
}
- private boolean isBroadcastExchange() {
+ protected boolean isBroadcastExchange() {
// If the output of the sink is not partitioned but the target fragment is
// partitioned, then the data exchange is broadcast.
Preconditions.checkState(!children_.isEmpty());
@@ -178,6 +179,39 @@ public class ExchangeNode extends PlanNode {
(exchInput.getTupleIds().size() * PER_TUPLE_SERIALIZATION_OVERHEAD);
}
+ // Return the number of sending instances of this exchange.
+ public int getNumSenders() {
+ Preconditions.checkState(!children_.isEmpty());
+ Preconditions.checkNotNull(children_.get(0).getFragment());
+ return children_.get(0).getFragment().getNumInstances();
+ }
+
+ // Return the number of receiving instances of this exchange.
+ public int getNumReceivers() {
+ DataSink sink = fragment_.getSink();
+ if (sink == null) return 1;
+ return sink.getFragment().getNumInstances();
+ }
+
+ @Override
+ public void computeProcessingCost(TQueryOptions queryOptions) {
+ // The computation for the processing cost for exchange splits into two parts:
+ // 1. The sending processing cost which is computed in the DataStreamSink of the
+ // bottom sending fragment;
+ // 2. The receiving processing cost in the top receiving fragment which is computed
+ // here.
+ // Assume serialization and deserialization costs per row are equal.
+ float conjunctsCost = ExprUtil.computeExprsTotalCost(conjuncts_);
+ float materializationCost = estimateSerializationCostPerRow();
+ processingCost_ = ProcessingCost.basicCost(getDisplayLabel() + "(receiving)",
+ getChild(0).getCardinality(), conjunctsCost, materializationCost);
+
+ if (isBroadcastExchange()) {
+ processingCost_ = ProcessingCost.broadcastCost(processingCost_,
+ () -> getNumReceivers());
+ }
+ }
+
@Override
public void computeNodeResourceProfile(TQueryOptions queryOptions) {
// For non-merging exchanges, one row batch queue is maintained for row
@@ -193,9 +227,7 @@ public class ExchangeNode extends PlanNode {
// the system load. This makes it difficult to accurately estimate the
// memory usage at runtime. The following estimates assume that memory usage will
// lean towards the soft limits.
- Preconditions.checkState(!children_.isEmpty());
- Preconditions.checkNotNull(children_.get(0).getFragment());
- int numSenders = children_.get(0).getFragment().getNumInstances();
+ int numSenders = getNumSenders();
long estimatedTotalQueueByteSize = estimateTotalQueueByteSize(numSenders);
long estimatedDeferredRPCQueueSize = estimateDeferredRPCQueueSize(queryOptions,
numSenders);
@@ -205,12 +237,18 @@ public class ExchangeNode extends PlanNode {
nodeResourceProfile_ = ResourceProfile.noReservation(estimatedMem);
}
+ /**
+ * Estimate per-row serialization/deserialization cost as 1 per 1KB.
+ */
+ protected float estimateSerializationCostPerRow() {
+ return (float) getAvgSerializedRowSize(this) / 1024;
+ }
+
// Returns the estimated size of the deferred batch queue (in bytes) by
// assuming that at least one row batch rpc payload per sender is queued.
private long estimateDeferredRPCQueueSize(TQueryOptions queryOptions,
int numSenders) {
- long rowBatchSize = queryOptions.isSetBatch_size() && queryOptions.batch_size > 0
- ? queryOptions.batch_size : DEFAULT_ROWBATCH_SIZE;
+ long rowBatchSize = getRowBatchSize(queryOptions);
// Set an upper limit based on estimated cardinality.
if (getCardinality() > 0) rowBatchSize = Math.min(rowBatchSize, getCardinality());
long avgRowBatchByteSize = Math.min(
@@ -251,6 +289,10 @@ public class ExchangeNode extends PlanNode {
@Override
protected void toThrift(TPlanNode msg) {
+ if (processingCost_.isValid() && processingCost_ instanceof BroadcastProcessingCost) {
+ Preconditions.checkState(
+ getNumReceivers() == processingCost_.getNumInstancesExpected());
+ }
msg.node_type = TPlanNodeType.EXCHANGE_NODE;
msg.exchange_node = new TExchangeNode();
for (TupleId tid: tupleIds_) {
diff --git a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
index 2b764e602..96460b14b 100644
--- a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
@@ -59,7 +59,6 @@ import org.apache.impala.thrift.TScanRangeLocation;
import org.apache.impala.thrift.TScanRangeLocationList;
import org.apache.impala.thrift.TScanRangeSpec;
import org.apache.impala.util.BitUtil;
-import org.apache.impala.util.ExecutorMembershipSnapshot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -655,6 +654,11 @@ public class HBaseScanNode extends ScanNode {
}
}
+ @Override
+ public void computeProcessingCost(TQueryOptions queryOptions) {
+ processingCost_ = computeScanProcessingCost(queryOptions);
+ }
+
@Override
public void computeNodeResourceProfile(TQueryOptions queryOptions) {
FeHBaseTable tbl = (FeHBaseTable) desc_.getTable();
diff --git a/fe/src/main/java/org/apache/impala/planner/HBaseTableSink.java b/fe/src/main/java/org/apache/impala/planner/HBaseTableSink.java
index cfbb335e7..abeb27f09 100644
--- a/fe/src/main/java/org/apache/impala/planner/HBaseTableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/HBaseTableSink.java
@@ -54,6 +54,12 @@ public class HBaseTableSink extends TableSink {
return "HBASE WRITER";
}
+ @Override
+ public void computeProcessingCost(TQueryOptions queryOptions) {
+ // The processing cost to export rows.
+ processingCost_ = computeDefaultProcessingCost();
+ }
+
@Override
public void computeResourceProfile(TQueryOptions queryOptions) {
resourceProfile_ = ResourceProfile.noReservation(0);
diff --git a/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java b/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
index 0fe91eed3..7aeee7433 100644
--- a/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
@@ -37,6 +37,7 @@ import org.apache.impala.thrift.TPlanNode;
import org.apache.impala.thrift.TPlanNodeType;
import org.apache.impala.thrift.TQueryOptions;
import org.apache.impala.util.BitUtil;
+import org.apache.impala.util.ExprUtil;
import com.google.common.base.Joiner;
import com.google.common.base.MoreObjects;
@@ -314,4 +315,30 @@ public class HashJoinNode extends JoinNode {
.setMaxRowBufferBytes(maxRowBufferSize).build();
return Pair.create(probeProfile, buildProfile);
}
+
+ @Override
+ public Pair<ProcessingCost, ProcessingCost> computeJoinProcessingCost() {
+ // TODO: The cost should consider conjuncts_ as well.
+ // Assume 'eqJoinConjuncts_' will be applied to all rows from lhs and rhs side,
+ // and 'otherJoinConjuncts_' to the resultant rows.
+ float eqJoinPredicateEvalCost = ExprUtil.computeExprsTotalCost(eqJoinConjuncts_);
+ float otherJoinPredicateEvalCost =
+ ExprUtil.computeExprsTotalCost(otherJoinConjuncts_);
+
+ // Compute the processing cost for lhs.
+ ProcessingCost probeProcessingCost =
+ ProcessingCost.basicCost(getDisplayLabel() + " Probe side (eqJoinConjuncts_)",
+ getChild(0).getCardinality(), eqJoinPredicateEvalCost);
+ if (otherJoinPredicateEvalCost > 0) {
+ probeProcessingCost = ProcessingCost.sumCost(probeProcessingCost,
+ ProcessingCost.basicCost(getDisplayLabel() + " Probe side(otherJoinConjuncts_)",
+ getCardinality(), otherJoinPredicateEvalCost));
+ }
+
+ // Compute the processing cost for rhs.
+ ProcessingCost buildProcessingCost =
+ ProcessingCost.basicCost(getDisplayLabel() + " Build side",
+ getChild(1).getCardinality(), eqJoinPredicateEvalCost);
+ return Pair.create(probeProcessingCost, buildProcessingCost);
+ }
}
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 9931d5849..6ba0b3c31 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -2108,6 +2108,11 @@ public class HdfsScanNode extends ScanNode {
return output.toString();
}
+ @Override
+ public void computeProcessingCost(TQueryOptions queryOptions) {
+ processingCost_ = computeScanProcessingCost(queryOptions);
+ }
+
@Override
public void computeNodeResourceProfile(TQueryOptions queryOptions) {
// Update 'useMtScanNode_' before any return cases. It's used in BE.
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
index 76b68a1ff..399e45829 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
@@ -41,10 +41,8 @@ import org.apache.impala.thrift.TTableSinkType;
import org.apache.impala.util.BitUtil;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -142,6 +140,12 @@ public class HdfsTableSink extends TableSink {
externalOutputPartitionDepth_ = partitionDepth;
}
+ @Override
+ public void computeProcessingCost(TQueryOptions queryOptions) {
+ // The processing cost to export rows.
+ processingCost_ = computeDefaultProcessingCost();
+ }
+
@Override
public void computeResourceProfile(TQueryOptions queryOptions) {
PlanNode inputNode = fragment_.getPlanRoot();
diff --git a/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java b/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java
index c8cc056d0..feed83381 100644
--- a/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java
@@ -141,11 +141,19 @@ public class JoinBuildSink extends DataSink {
joinNode_.getFragment().getNumInstances();
}
+ public boolean isShared() { return joinNode_.canShareBuild(); }
+
@Override
protected String getLabel() {
return "JOIN BUILD";
}
+ @Override
+ public void computeProcessingCost(TQueryOptions queryOptions) {
+ // The processing cost to export rows.
+ processingCost_ = joinNode_.computeJoinProcessingCost().second;
+ }
+
@Override
public void computeResourceProfile(TQueryOptions queryOptions) {
resourceProfile_ = joinNode_.computeJoinResourceProfile(queryOptions).second;
diff --git a/fe/src/main/java/org/apache/impala/planner/JoinNode.java b/fe/src/main/java/org/apache/impala/planner/JoinNode.java
index f31045235..02d39e991 100644
--- a/fe/src/main/java/org/apache/impala/planner/JoinNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/JoinNode.java
@@ -909,6 +909,19 @@ public abstract class JoinNode extends PlanNode {
return result;
}
+ @Override
+ public void computeProcessingCost(TQueryOptions queryOptions) {
+ Pair<ProcessingCost, ProcessingCost> probeBuildCost = computeJoinProcessingCost();
+ if (hasSeparateBuild()) {
+ // All build resource consumption is accounted for in the separate builder.
+ processingCost_ = probeBuildCost.first;
+ } else {
+ // Both build and profile resources are accounted for in the node.
+ processingCost_ =
+ ProcessingCost.sumCost(probeBuildCost.first, probeBuildCost.second);
+ }
+ }
+
@Override
public void computeNodeResourceProfile(TQueryOptions queryOptions) {
Pair<ResourceProfile, ResourceProfile> profiles =
@@ -931,6 +944,14 @@ public abstract class JoinNode extends PlanNode {
public abstract Pair<ResourceProfile, ResourceProfile> computeJoinResourceProfile(
TQueryOptions queryOptions);
+ /**
+ * Helper method to compute the processing cost for the join that can be
+ * called from the builder or the join node. Returns a pair of the probe
+ * processing cost and the build processing cost.
+ * Does not modify the state of this node.
+ */
+ public abstract Pair<ProcessingCost, ProcessingCost> computeJoinProcessingCost();
+
/* Helper to return all predicates as a string. */
public String getAllPredicatesAsString(TExplainLevel level) {
return "Conjuncts=" + Expr.getExplainString(getConjuncts(), level)
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
index b65388ac6..6ec5255cb 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
@@ -395,6 +395,11 @@ public class KuduScanNode extends ScanNode {
}
}
+ @Override
+ public void computeProcessingCost(TQueryOptions queryOptions) {
+ processingCost_ = computeScanProcessingCost(queryOptions);
+ }
+
@Override
public void computeNodeResourceProfile(TQueryOptions queryOptions) {
// The bulk of memory used by Kudu scan node is generally utilized by the
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java b/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java
index eba08b5ce..52fa82d65 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java
@@ -18,7 +18,6 @@
package org.apache.impala.planner;
-import java.nio.ByteBuffer;
import java.util.List;
import org.apache.impala.analysis.DescriptorTable;
@@ -94,6 +93,12 @@ public class KuduTableSink extends TableSink {
return "KUDU WRITER";
}
+ @Override
+ public void computeProcessingCost(TQueryOptions queryOptions) {
+ // The processing cost to export rows.
+ processingCost_ = computeDefaultProcessingCost();
+ }
+
@Override
public void computeResourceProfile(TQueryOptions queryOptions) {
// The major chunk of memory used by this node is untracked. Part of which
diff --git a/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java b/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java
index b36cb9fc5..ecd5c03ae 100644
--- a/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java
@@ -31,6 +31,7 @@ import org.apache.impala.thrift.TNestedLoopJoinNode;
import org.apache.impala.thrift.TPlanNode;
import org.apache.impala.thrift.TPlanNodeType;
import org.apache.impala.thrift.TQueryOptions;
+import org.apache.impala.util.ExprUtil;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
@@ -78,6 +79,7 @@ public class NestedLoopJoinNode extends JoinNode {
@Override
public Pair<ResourceProfile, ResourceProfile> computeJoinResourceProfile(
TQueryOptions queryOptions) {
+ // TODO: This seems a bug below that the total data is not divided by numInstances_.
long perInstanceMemEstimate;
if (getChild(1).getCardinality() == -1 || getChild(1).getAvgRowSize() == -1
|| numNodes_ == 0) {
@@ -92,6 +94,51 @@ public class NestedLoopJoinNode extends JoinNode {
return Pair.create(ResourceProfile.noReservation(0), buildProfile);
}
+ @Override
+ public Pair<ProcessingCost, ProcessingCost> computeJoinProcessingCost() {
+ // TODO: Make this general regardless of SingularRowSrcNode exist or not.
+ // TODO: The cost should consider conjuncts_ as well.
+ ProcessingCost probeProcessingCost = ProcessingCost.zero();
+ ProcessingCost buildProcessingCost = ProcessingCost.zero();
+ if (getChild(1) instanceof SingularRowSrcNode) {
+ // Compute the processing cost for lhs.
+ probeProcessingCost =
+ ProcessingCost.basicCost(getDisplayLabel() + "(c0, singularRowSrc) Probe side",
+ getChild(0).getCardinality(), 0);
+
+ // Compute the processing cost for rhs.
+ buildProcessingCost = ProcessingCost.basicCost(
+ getDisplayLabel() + "(c0, singularRowSrc) Build side per probe",
+ getChild(1).getCardinality(), 0);
+ // Multiply by the number of probes
+ buildProcessingCost = ProcessingCost.scaleCost(
+ buildProcessingCost, Math.max(0, getChild(0).getCardinality()));
+ } else {
+ // Assume 'eqJoinConjuncts_' will be applied to all rows from lhs side,
+ // and 'otherJoinConjuncts_' to the resultant rows.
+ float eqJoinPredicateEvalCost = ExprUtil.computeExprsTotalCost(eqJoinConjuncts_);
+ float otherJoinPredicateEvalCost =
+ ExprUtil.computeExprsTotalCost(otherJoinConjuncts_);
+
+ // Compute the processing cost for lhs.
+ probeProcessingCost = ProcessingCost.basicCost(
+ getDisplayLabel() + "(c0, non-singularRowSrc, eqJoinConjuncts_) Probe side",
+ getChild(0).getCardinality(), eqJoinPredicateEvalCost);
+
+ probeProcessingCost = ProcessingCost.sumCost(probeProcessingCost,
+ ProcessingCost.basicCost(getDisplayLabel()
+ + "(c0, non-singularRowSrc, otherJoinConjuncts_) Probe side",
+ getCardinality(), otherJoinPredicateEvalCost));
+
+ // Compute the processing cost for rhs, assuming 'eqJoinConjuncts_' will be applied
+ // to all rows from rhs side.
+ buildProcessingCost = ProcessingCost.basicCost(
+ getDisplayLabel() + "(c0, non-singularRowSrc) Build side",
+ getChild(1).getCardinality(), eqJoinPredicateEvalCost);
+ }
+ return Pair.create(probeProcessingCost, buildProcessingCost);
+ }
+
@Override
protected String getNodeExplainString(String prefix, String detailPrefix,
TExplainLevel detailLevel) {
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
index ae19ce278..6dc9709c5 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
@@ -28,7 +28,6 @@ import java.util.Set;
import org.apache.impala.analysis.Analyzer;
import org.apache.impala.analysis.BinaryPredicate;
-import org.apache.impala.analysis.CollectionTableRef;
import org.apache.impala.analysis.Expr;
import org.apache.impala.analysis.ExprId;
import org.apache.impala.analysis.ExprSubstitutionMap;
@@ -49,6 +48,7 @@ import org.apache.impala.thrift.TPlanNode;
import org.apache.impala.thrift.TQueryOptions;
import org.apache.impala.thrift.TSortingOrder;
import org.apache.impala.util.BitUtil;
+import org.apache.impala.util.ExprUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -154,6 +154,10 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
// Runtime filters assigned to this node.
protected List<RuntimeFilter> runtimeFilters_ = new ArrayList<>();
+ // A total processing cost across all instances of this plan node.
+ // Gets set correctly in computeProcessingCost().
+ protected ProcessingCost processingCost_ = ProcessingCost.invalid();
+
protected PlanNode(PlanNodeId id, List<TupleId> tupleIds, String displayName) {
this(id, displayName);
tupleIds_.addAll(tupleIds);
@@ -235,6 +239,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
public void setAssignedConjuncts(Set<ExprId> conjuncts) {
assignedConjuncts_ = conjuncts;
}
+ public ProcessingCost getProcessingCost() { return processingCost_; }
/**
* Set the limit_ to the given limit_ only if the limit_ hasn't been set, or the new limit_
@@ -342,6 +347,13 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
expBuilder.append(nodeResourceProfile_.getExplainString());
expBuilder.append("\n");
+ if (ProcessingCost.isComputeCost(queryOptions) && processingCost_.isValid()
+ && detailLevel.ordinal() >= TExplainLevel.VERBOSE.ordinal()) {
+ // Print processing cost.
+ expBuilder.append(processingCost_.getExplainString(detailPrefix, false));
+ expBuilder.append("\n");
+ }
+
// Print tuple ids, row size and cardinality.
expBuilder.append(detailPrefix + "tuple-ids=");
for (int i = 0; i < tupleIds_.size(); ++i) {
@@ -358,10 +370,19 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
if (displayCardinality) {
if (detailLevel == TExplainLevel.STANDARD) expBuilder.append(detailPrefix);
expBuilder.append("row-size=")
- .append(PrintUtils.printBytes(Math.round(avgRowSize_)))
- .append(" cardinality=")
- .append(PrintUtils.printEstCardinality(cardinality_))
- .append("\n");
+ .append(PrintUtils.printBytes(Math.round(avgRowSize_)))
+ .append(" cardinality=")
+ .append(PrintUtils.printEstCardinality(cardinality_));
+ if (ProcessingCost.isComputeCost(queryOptions)) {
+ // Show processing cost total.
+ expBuilder.append(" cost=");
+ if (processingCost_.isValid()) {
+ expBuilder.append(processingCost_.getTotalCost());
+ } else {
+ expBuilder.append("<invalid>");
+ }
+ }
+ expBuilder.append("\n");
}
if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
@@ -653,6 +674,13 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
return cardinality;
}
+ // Default implementation of computing the total data processed in bytes.
+ protected ProcessingCost computeDefaultProcessingCost() {
+ Preconditions.checkState(hasValidStats());
+ return ProcessingCost.basicCost(getDisplayLabel(), getInputCardinality(),
+ ExprUtil.computeExprsTotalCost(getConjuncts()));
+ }
+
public static long capCardinalityAtLimit(long cardinality, long limit) {
return cardinality == -1 ? limit : Math.min(cardinality, limit);
}
@@ -882,14 +910,41 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
}
}
+ public abstract void computeProcessingCost(TQueryOptions queryOptions);
+
/**
* Compute peak resources consumed when executing this PlanNode, initializing
- * 'nodeResourceProfile_'. May only be called after this PlanNode has been placed in
- * a PlanFragment because the cost computation is dependent on the enclosing fragment's
- * data partition.
+ * 'nodeResourceProfile_' and 'processingCost_'. May only be called after this PlanNode
+ * has been placed in a PlanFragment because the cost computation is dependent on the
+ * enclosing fragment's data partition.
*/
public abstract void computeNodeResourceProfile(TQueryOptions queryOptions);
+ /**
+ * Determine whether a PlanNode is a leaf node within the plan tree.
+ * @return true if a PlanNode is a leaf node within the plan tree.
+ */
+ protected boolean isLeafNode() { return false; }
+
+ /**
+ * Set number of rows consumed and produced data fields in processing cost.
+ */
+ public void computeRowConsumptionAndProductionToCost() {
+ Preconditions.checkState(processingCost_.isValid(),
+ "Processing cost of PlanNode " + getDisplayLabel() + " is invalid!");
+ processingCost_.setNumRowToConsume(getInputCardinality());
+ processingCost_.setNumRowToProduce(getCardinality());
+ }
+
+ /**
+ * Get row batch size after considering 'batch_size' query option.
+ */
+ protected static long getRowBatchSize(TQueryOptions queryOptions) {
+ return (queryOptions.isSetBatch_size() && queryOptions.batch_size > 0) ?
+ queryOptions.batch_size :
+ PlanNode.DEFAULT_ROWBATCH_SIZE;
+ }
+
/**
* Wrapper class to represent resource profiles during different phases of execution.
*/
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java b/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
index 53887b738..133aec186 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
@@ -26,6 +26,7 @@ import org.apache.impala.thrift.TDataSinkType;
import org.apache.impala.thrift.TExplainLevel;
import org.apache.impala.thrift.TPlanRootSink;
import org.apache.impala.thrift.TQueryOptions;
+import org.apache.impala.util.ExprUtil;
import org.apache.log4j.Logger;
import com.google.common.base.Preconditions;
@@ -67,6 +68,19 @@ public class PlanRootSink extends DataSink {
return "ROOT";
}
+ @Override
+ public void computeProcessingCost(TQueryOptions queryOptions) {
+ if (queryOptions.isSpool_query_results() && queryOptions.getScratch_limit() != 0
+ && !BackendConfig.INSTANCE.getScratchDirs().isEmpty()) {
+ // The processing cost to buffer these many rows in root.
+ processingCost_ =
+ ProcessingCost.basicCost(getLabel(), fragment_.getPlanRoot().getCardinality(),
+ ExprUtil.computeExprsTotalCost(outputExprs_));
+ } else {
+ processingCost_ = ProcessingCost.zero();
+ }
+ }
+
/**
* Computes and sets the {@link ResourceProfile} for this PlanRootSink. If result
* spooling is disabled, a ResourceProfile is returned with no reservation or buffer
diff --git a/fe/src/main/java/org/apache/impala/planner/ProcessingCost.java b/fe/src/main/java/org/apache/impala/planner/ProcessingCost.java
new file mode 100644
index 000000000..df695bddc
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/planner/ProcessingCost.java
@@ -0,0 +1,306 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.planner;
+
+import com.google.common.base.Preconditions;
+import com.google.common.math.LongMath;
+
+import org.apache.impala.thrift.TQueryOptions;
+
+import java.math.RoundingMode;
+import java.util.List;
+import java.util.function.Supplier;
+
+/**
+ * A base class that encapsulate processing cost which models a total cost or amount
+ * of work shared across all instances of specific {@link PlanNode}, {@link DataSink}, or
+ * {@link PlanFragment}.
+ */
+public abstract class ProcessingCost implements Cloneable {
+ public static ProcessingCost invalid() { return new BaseProcessingCost(-1, 1, 0); }
+ public static ProcessingCost zero() { return new BaseProcessingCost(0, 1, 0); }
+
+ public static ProcessingCost maxCost(ProcessingCost a, ProcessingCost b) {
+ return (a.getTotalCost() >= b.getTotalCost()) ? a : b;
+ }
+
+ public static ProcessingCost sumCost(ProcessingCost a, ProcessingCost b) {
+ return new SumProcessingCost(a, b);
+ }
+
+ public static ProcessingCost scaleCost(ProcessingCost cost, long factor) {
+ return new ScaledProcessingCost(cost, factor);
+ }
+
+ public static ProcessingCost broadcastCost(
+ ProcessingCost cost, Supplier<Integer> numInstanceSupplier) {
+ return new BroadcastProcessingCost(cost, numInstanceSupplier);
+ }
+
+ protected static void tryAdjustConsumerParallelism(int nodeStepCount,
+ int minParallelism, int maxParallelism, ProcessingCost producer,
+ ProcessingCost consumer) {
+ Preconditions.checkState(consumer.getNumInstancesExpected() > 0);
+ Preconditions.checkState(producer.getNumInstancesExpected() > 0);
+ if (producer.getCostPerRowProduced() > 0
+ && (consumer.canReducedBy(nodeStepCount, minParallelism, producer)
+ || (consumer.canIncreaseBy(nodeStepCount, maxParallelism, producer)))) {
+ // Adjust consumer's concurrency following producer's parallelism and their
+ // produce-consume rate ratio.
+ float consProdRatio = consumer.consumerProducerRatio(producer);
+ int adjustedCount = (int) Math.ceil(consProdRatio
+ * producer.getNumInstancesExpected() / nodeStepCount)
+ * nodeStepCount;
+ final int finalCount =
+ Math.max(minParallelism, Math.min(maxParallelism, adjustedCount));
+ consumer.setNumInstanceExpected(() -> finalCount);
+ } else if (maxParallelism < consumer.getNumInstancesExpected()) {
+ consumer.setNumInstanceExpected(() -> maxParallelism);
+ }
+ }
+
+ private static ProcessingCost computeValidBaseCost(
+ long cardinality, float exprsCost, float materializationCost) {
+ return new BaseProcessingCost(
+ Math.max(0, cardinality), exprsCost, materializationCost);
+ }
+
+ public static ProcessingCost basicCost(
+ String label, long cardinality, float exprsCost, float materializationCost) {
+ ProcessingCost processingCost =
+ computeValidBaseCost(cardinality, exprsCost, materializationCost);
+ processingCost.setLabel(label);
+ return processingCost;
+ }
+
+ public static ProcessingCost basicCost(
+ String label, long cardinality, float exprsCost) {
+ ProcessingCost processingCost = computeValidBaseCost(cardinality, exprsCost, 0);
+ processingCost.setLabel(label);
+ return processingCost;
+ }
+
+ public static boolean isComputeCost(TQueryOptions queryOptions) {
+ // TODO: Replace with proper check in IMPALA-11604 part 2.
+ return false;
+ }
+
+ /**
+ * Merge multiple ProcessingCost into a single new ProcessingCost.
+ * <p>
+ * The resulting ProcessingCost will have the total cost, number of rows produced,
+ * and number of rows consumed as a sum of respective properties of all ProcessingCost
+ * in the given list. Meanwhile, the number of instances expected is the maximum among
+ * all ProcessingCost is the list.
+ *
+ * @param costs list of all ProcessingCost to merge.
+ * @return A new combined ProcessingCost.
+ */
+ protected static ProcessingCost fullMergeCosts(List<ProcessingCost> costs) {
+ Preconditions.checkNotNull(costs);
+ Preconditions.checkArgument(!costs.isEmpty());
+
+ ProcessingCost resultingCost = ProcessingCost.zero();
+ long inputCardinality = 0;
+ long outputCardinality = 0;
+ int maxProducerParallelism = 1;
+ for (ProcessingCost cost : costs) {
+ resultingCost = ProcessingCost.sumCost(resultingCost, cost);
+ inputCardinality += cost.getNumRowToConsume();
+ outputCardinality += cost.getNumRowToProduce();
+ maxProducerParallelism =
+ Math.max(maxProducerParallelism, cost.getNumInstancesExpected());
+ }
+ resultingCost.setNumRowToConsume(inputCardinality);
+ resultingCost.setNumRowToProduce(outputCardinality);
+ final int finalProducerParallelism = maxProducerParallelism;
+ resultingCost.setNumInstanceExpected(() -> finalProducerParallelism);
+ return resultingCost;
+ }
+
+ protected Supplier<Integer> numInstanceSupplier_ = null;
+ private long numRowToProduce_ = 0;
+ private long numRowToConsume_ = 0;
+ private String label_ = null;
+ private boolean isSetNumRowToProduce_ = false;
+ private boolean isSetNumRowToConsume_ = false;
+
+ public abstract long getTotalCost();
+
+ public abstract boolean isValid();
+
+ public abstract ProcessingCost clone();
+
+ public String getDetails() {
+ StringBuilder output = new StringBuilder();
+ output.append("cost-total=")
+ .append(getTotalCost())
+ .append(" max-instances=")
+ .append(getNumInstanceMax());
+ if (hasAdjustedInstanceCount()) {
+ output.append(" adj-instances=").append(getNumInstancesExpected());
+ }
+ output.append(" cost/inst=")
+ .append(getPerInstanceCost())
+ .append(" #cons:#prod=")
+ .append(numRowToConsume_)
+ .append(":")
+ .append(numRowToProduce_);
+ if (isSetNumRowToConsume_ && isSetNumRowToProduce_) {
+ output.append(" reduction=").append(getReduction());
+ }
+ if (isSetNumRowToConsume_) {
+ output.append(" cost/cons=").append(getCostPerRowConsumed());
+ }
+ if (isSetNumRowToProduce_) {
+ output.append(" cost/prod=").append(getCostPerRowProduced());
+ }
+ return output.toString();
+ }
+
+ public String debugString() {
+ StringBuilder output = new StringBuilder();
+ if (label_ != null) {
+ output.append(label_);
+ output.append("=");
+ }
+ output.append(this);
+ return output.toString();
+ }
+
+ @Override
+ public String toString() {
+ return "{" + getDetails() + "}";
+ }
+
+ public String getExplainString(String detailPrefix, boolean fullExplain) {
+ return detailPrefix + getDetails();
+ }
+
+ public void setNumInstanceExpected(Supplier<Integer> countSupplier) {
+ Preconditions.checkArgument(
+ countSupplier.get() > 0, "Number of instance must be greater than 0!");
+ numInstanceSupplier_ = countSupplier;
+ }
+
+ public int getNumInstancesExpected() {
+ return hasAdjustedInstanceCount() ? numInstanceSupplier_.get() : getNumInstanceMax();
+ }
+
+ private boolean hasAdjustedInstanceCount() {
+ return numInstanceSupplier_ != null && numInstanceSupplier_.get() > 0;
+ }
+
+ private int getNumInstanceMax() {
+ // TODO: replace minProcessingCostPerThread with backend flag.
+ long minProcessingCostPerThread = 10000000L;
+ long maxInstance = LongMath.divide(getTotalCost(),
+ minProcessingCostPerThread, RoundingMode.CEILING);
+ if (maxInstance > 0) {
+ return maxInstance < Integer.MAX_VALUE ? (int) maxInstance : Integer.MAX_VALUE;
+ } else {
+ return 1;
+ }
+ }
+
+ /**
+ * Set num rows to produce.
+ *
+ * @param numRowToProduce Number of rows to produce by plan node or data sink associated
+ * with this cost. Assume 0 rows if negative value is given.
+ */
+ public void setNumRowToProduce(long numRowToProduce) {
+ numRowToProduce_ = Math.max(0, numRowToProduce);
+ isSetNumRowToProduce_ = true;
+ }
+
+ /**
+ * Set num rows to consume.
+ *
+ * @param numRowToConsume Number of rows to consume by plan node or data sink associated
+ * with this cost. Assume 0 rows if negative value is given.
+ */
+ protected void setNumRowToConsume(long numRowToConsume) {
+ numRowToConsume_ = Math.max(0, numRowToConsume);
+ isSetNumRowToConsume_ = true;
+ }
+
+ public void setLabel(String label) { label_ = label; }
+ public long getNumRowToConsume() { return numRowToConsume_; }
+ public long getNumRowToProduce() { return numRowToProduce_; }
+
+ private int getPerInstanceCost() {
+ Preconditions.checkState(getNumInstancesExpected() > 0);
+ return (int) Math.ceil((float) getTotalCost() / getNumInstancesExpected());
+ }
+
+ private float getReduction() {
+ return (float) numRowToConsume_ / Math.max(1, numRowToProduce_);
+ }
+
+ private float getCostPerRowProduced() {
+ return (float) getTotalCost() / Math.max(1, numRowToProduce_);
+ }
+
+ private float getCostPerRowConsumed() {
+ return (float) getTotalCost() / Math.max(1, numRowToConsume_);
+ }
+
+ private float instanceRatio(ProcessingCost other) {
+ Preconditions.checkState(getNumInstancesExpected() > 0);
+ return (float) getNumInstancesExpected() / other.getNumInstancesExpected();
+ }
+
+ private float consumerProducerRatio(ProcessingCost other) {
+ return getCostPerRowConsumed() / Math.max(1, other.getCostPerRowProduced());
+ }
+
+ private boolean isAtLowestInstanceRatio(
+ int nodeStepCount, int minParallelism, ProcessingCost other) {
+ if (getNumInstancesExpected() - nodeStepCount < minParallelism) {
+ return true;
+ } else {
+ float lowerRatio = (float) (getNumInstancesExpected() - nodeStepCount)
+ / other.getNumInstancesExpected();
+ return lowerRatio < consumerProducerRatio(other);
+ }
+ }
+
+ private boolean isAtHighestInstanceRatio(
+ int nodeStepCount, int maxInstance, ProcessingCost other) {
+ if (getNumInstancesExpected() + nodeStepCount > maxInstance) {
+ return true;
+ } else {
+ float higherRatio = (float) (getNumInstancesExpected() + nodeStepCount)
+ / other.getNumInstancesExpected();
+ return higherRatio > consumerProducerRatio(other);
+ }
+ }
+
+ private boolean canReducedBy(
+ int nodeStepCount, int minParallelism, ProcessingCost other) {
+ return !isAtLowestInstanceRatio(nodeStepCount, minParallelism, other)
+ && consumerProducerRatio(other) < instanceRatio(other);
+ }
+
+ private boolean canIncreaseBy(
+ int nodeStepCount, int maxInstance, ProcessingCost other) {
+ return !isAtHighestInstanceRatio(nodeStepCount, maxInstance, other)
+ && consumerProducerRatio(other) > instanceRatio(other);
+ }
+}
diff --git a/fe/src/main/java/org/apache/impala/planner/ScaledProcessingCost.java b/fe/src/main/java/org/apache/impala/planner/ScaledProcessingCost.java
new file mode 100644
index 000000000..421ee9e64
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/planner/ScaledProcessingCost.java
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.planner;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.impala.util.MathUtil;
+
+public class ScaledProcessingCost extends ProcessingCost {
+ private final ProcessingCost cost_;
+ private final long multiplier_;
+
+ protected ScaledProcessingCost(ProcessingCost cost, long multiplier) {
+ Preconditions.checkArgument(cost.isValid(), "ScaledProcessingCost: cost is invalid!");
+ Preconditions.checkArgument(
+ multiplier >= 0, "ScaledProcessingCost: multiplier must be non-negative!");
+ cost_ = cost;
+ multiplier_ = multiplier;
+ }
+
+ @Override
+ public long getTotalCost() {
+ return MathUtil.saturatingMultiply(cost_.getTotalCost(), multiplier_);
+ }
+
+ @Override
+ public boolean isValid() {
+ return true;
+ }
+
+ @Override
+ public ProcessingCost clone() {
+ return new ScaledProcessingCost(cost_, multiplier_);
+ }
+
+ @Override
+ public String getExplainString(String detailPrefix, boolean fullExplain) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(detailPrefix);
+ sb.append("ScaledCost(");
+ sb.append(multiplier_);
+ sb.append("): ");
+ sb.append(getDetails());
+ if (fullExplain) {
+ sb.append("\n");
+ sb.append(cost_.getExplainString(detailPrefix + " ", true));
+ }
+ return sb.toString();
+ }
+}
diff --git a/fe/src/main/java/org/apache/impala/planner/ScanNode.java b/fe/src/main/java/org/apache/impala/planner/ScanNode.java
index d250eb8db..be12b293c 100644
--- a/fe/src/main/java/org/apache/impala/planner/ScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/ScanNode.java
@@ -40,6 +40,7 @@ import org.apache.impala.thrift.TNetworkAddress;
import org.apache.impala.thrift.TQueryOptions;
import org.apache.impala.thrift.TScanRangeSpec;
import org.apache.impala.thrift.TTableStats;
+import org.apache.impala.util.ExprUtil;
import com.google.common.base.Joiner;
import com.google.common.base.MoreObjects;
@@ -343,6 +344,24 @@ abstract public class ScanNode extends PlanNode {
}
return maxScannerThreads;
}
+
+ protected ProcessingCost computeScanProcessingCost(TQueryOptions queryOptions) {
+ return ProcessingCost.basicCost(getDisplayLabel(), getInputCardinality(),
+ ExprUtil.computeExprsTotalCost(conjuncts_), rowMaterializationCost());
+ }
+
+ /**
+ * Estimate per-row cost as 1 per 1KB row size.
+ * <p>
+ * This reflect deserialization/copy cost per row.
+ */
+ private float rowMaterializationCost() { return getAvgRowSize() / 1024; }
+
+ @Override
+ protected boolean isLeafNode() {
+ return true;
+ }
+
/**
* Returns true if this node has conjuncts to be evaluated by Impala against the scan
* tuple.
diff --git a/fe/src/main/java/org/apache/impala/planner/SelectNode.java b/fe/src/main/java/org/apache/impala/planner/SelectNode.java
index fa888f47f..1bde0d41a 100644
--- a/fe/src/main/java/org/apache/impala/planner/SelectNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SelectNode.java
@@ -117,6 +117,11 @@ public class SelectNode extends PlanNode {
public void setSelectivity(double value) { selectivity_ = value; }
+ @Override
+ public void computeProcessingCost(TQueryOptions queryOptions) {
+ processingCost_ = computeDefaultProcessingCost();
+ }
+
@Override
public void computeNodeResourceProfile(TQueryOptions queryOptions) {
// The select node initializes a single row-batch which it recycles on every
diff --git a/fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java b/fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java
index e9c498417..f37b67c11 100644
--- a/fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java
@@ -23,6 +23,7 @@ import org.apache.impala.thrift.TExplainLevel;
import org.apache.impala.thrift.TPlanNode;
import org.apache.impala.thrift.TPlanNodeType;
import org.apache.impala.thrift.TQueryOptions;
+import org.apache.impala.util.ExprUtil;
import com.google.common.base.Preconditions;
@@ -66,6 +67,13 @@ public class SingularRowSrcNode extends PlanNode {
numInstances_ = containingSubplanNode_.getChild(0).getNumInstances();
}
+ @Override
+ public void computeProcessingCost(TQueryOptions queryOptions) {
+ processingCost_ = ProcessingCost.basicCost(getDisplayLabel(),
+ containingSubplanNode_.getChild(0).getCardinality(),
+ ExprUtil.computeExprsTotalCost(getConjuncts()));
+ }
+
@Override
public void computeNodeResourceProfile(TQueryOptions queryOptions) {
// TODO: add an estimate
diff --git a/fe/src/main/java/org/apache/impala/planner/SortNode.java b/fe/src/main/java/org/apache/impala/planner/SortNode.java
index 5f12dee45..6d8ad8b03 100644
--- a/fe/src/main/java/org/apache/impala/planner/SortNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SortNode.java
@@ -453,6 +453,12 @@ public class SortNode extends PlanNode {
return offset_ != 0 ? prefix + "offset: " + Long.toString(offset_) + "\n" : "";
}
+ @Override
+ public void computeProcessingCost(TQueryOptions queryOptions) {
+ processingCost_ =
+ getSortInfo().computeProcessingCost(getDisplayLabel(), getCardinality());
+ }
+
@Override
public void computeNodeResourceProfile(TQueryOptions queryOptions) {
Preconditions.checkState(hasValidStats());
diff --git a/fe/src/main/java/org/apache/impala/planner/SubplanNode.java b/fe/src/main/java/org/apache/impala/planner/SubplanNode.java
index eb57df628..09a6ac7d8 100644
--- a/fe/src/main/java/org/apache/impala/planner/SubplanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SubplanNode.java
@@ -96,6 +96,11 @@ public class SubplanNode extends PlanNode {
cardinality_ = capCardinalityAtLimit(cardinality_);
}
+ @Override
+ public void computeProcessingCost(TQueryOptions queryOptions) {
+ processingCost_ = ProcessingCost.basicCost(getDisplayLabel(), getCardinality(), 0);
+ }
+
@Override
public void computeNodeResourceProfile(TQueryOptions queryOptions) {
// TODO: add an estimate
diff --git a/fe/src/main/java/org/apache/impala/planner/SumProcessingCost.java b/fe/src/main/java/org/apache/impala/planner/SumProcessingCost.java
new file mode 100644
index 000000000..fcbd6dfab
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/planner/SumProcessingCost.java
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.planner;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.impala.util.MathUtil;
+
+public class SumProcessingCost extends ProcessingCost {
+ private final ProcessingCost cost1_;
+ private final ProcessingCost cost2_;
+
+ protected SumProcessingCost(ProcessingCost cost1, ProcessingCost cost2) {
+ Preconditions.checkArgument(cost1.isValid(), "SumProcessingCost: cost1 is invalid!");
+ Preconditions.checkArgument(cost2.isValid(), "SumProcessingCost: cost2 is invalid!");
+ cost1_ = cost1;
+ cost2_ = cost2;
+ }
+
+ @Override
+ public long getTotalCost() {
+ return MathUtil.saturatingAdd(cost1_.getTotalCost(), cost2_.getTotalCost());
+ }
+
+ @Override
+ public boolean isValid() {
+ return true;
+ }
+
+ @Override
+ public ProcessingCost clone() {
+ return new SumProcessingCost(cost1_, cost2_);
+ }
+
+ @Override
+ public String getExplainString(String detailPrefix, boolean fullExplain) {
+ StringBuilder output = new StringBuilder();
+ output.append(detailPrefix).append("SumCost: ").append(getDetails());
+ if (fullExplain) {
+ String nextPrefix = detailPrefix + " ";
+ output.append("\n").append(cost1_.getExplainString(nextPrefix, true));
+ output.append("\n").append(cost2_.getExplainString(nextPrefix, true));
+ }
+ return output.toString();
+ }
+}
diff --git a/fe/src/main/java/org/apache/impala/planner/TableSink.java b/fe/src/main/java/org/apache/impala/planner/TableSink.java
index cc12bd995..0f34ddb20 100644
--- a/fe/src/main/java/org/apache/impala/planner/TableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/TableSink.java
@@ -17,7 +17,6 @@
package org.apache.impala.planner;
-import java.nio.ByteBuffer;
import java.util.List;
import org.apache.impala.analysis.Expr;
@@ -28,6 +27,7 @@ import org.apache.impala.catalog.FeTable;
import org.apache.impala.common.Pair;
import org.apache.impala.thrift.TSinkAction;
import org.apache.impala.thrift.TSortingOrder;
+import org.apache.impala.util.ExprUtil;
import com.google.common.base.Preconditions;
@@ -160,4 +160,10 @@ public abstract class TableSink extends DataSink {
"Cannot create data sink into table of type: " + table.getClass().getName());
}
}
+
+ protected ProcessingCost computeDefaultProcessingCost() {
+ // TODO: consider including materialization cost into the returned cost.
+ return ProcessingCost.basicCost(getLabel(), fragment_.getPlanRoot().getCardinality(),
+ ExprUtil.computeExprsTotalCost(outputExprs_));
+ }
}
diff --git a/fe/src/main/java/org/apache/impala/planner/UnionNode.java b/fe/src/main/java/org/apache/impala/planner/UnionNode.java
index b1dbe7212..c8f45dd7b 100644
--- a/fe/src/main/java/org/apache/impala/planner/UnionNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/UnionNode.java
@@ -33,6 +33,7 @@ import org.apache.impala.thrift.TPlanNode;
import org.apache.impala.thrift.TPlanNodeType;
import org.apache.impala.thrift.TQueryOptions;
import org.apache.impala.thrift.TUnionNode;
+import org.apache.impala.util.ExprUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -148,6 +149,19 @@ public class UnionNode extends PlanNode {
}
}
+ @Override
+ public void computeProcessingCost(TQueryOptions queryOptions) {
+ // Compute the cost for materializing child rows and use that to figure out
+ // the total data processed. Assume the costs of processing pass-through rows are 0.
+ float totalMaterializeCost = 0;
+ for (int i = firstMaterializedChildIdx_; i < resultExprLists_.size(); i++) {
+ totalMaterializeCost += ExprUtil.computeExprsTotalCost(resultExprLists_.get(i));
+ }
+
+ processingCost_ =
+ ProcessingCost.basicCost(getDisplayLabel(), cardinality_, totalMaterializeCost);
+ }
+
@Override
public void computeNodeResourceProfile(TQueryOptions queryOptions) {
// The union node directly returns the rows for children marked as pass
@@ -375,4 +389,11 @@ public class UnionNode extends PlanNode {
}
}
}
+
+ @Override
+ protected boolean isLeafNode() {
+ // Union node is being scheduled the same as scan node.
+ // See Scheduler::CreateCollocatedAndScanInstances() in scheduler.cc.
+ return true;
+ }
}
diff --git a/fe/src/main/java/org/apache/impala/planner/UnnestNode.java b/fe/src/main/java/org/apache/impala/planner/UnnestNode.java
index dd6eb33f5..919d849e3 100644
--- a/fe/src/main/java/org/apache/impala/planner/UnnestNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/UnnestNode.java
@@ -17,7 +17,6 @@
package org.apache.impala.planner;
-import java.util.Comparator;
import java.util.List;
import org.apache.impala.analysis.Analyzer;
@@ -96,6 +95,12 @@ public class UnnestNode extends PlanNode {
cardinality_ = capCardinalityAtLimit(cardinality_);
}
+ @Override
+ public void computeProcessingCost(TQueryOptions queryOptions) {
+ processingCost_ = ProcessingCost.basicCost(
+ getDisplayLabel(), containingSubplanNode_.getChild(0).getCardinality(), 0);
+ }
+
@Override
public void computeNodeResourceProfile(TQueryOptions queryOptions) {
// TODO: add an estimate
diff --git a/fe/src/main/java/org/apache/impala/util/ExprUtil.java b/fe/src/main/java/org/apache/impala/util/ExprUtil.java
index 934fabd0b..2b34835da 100644
--- a/fe/src/main/java/org/apache/impala/util/ExprUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/ExprUtil.java
@@ -27,9 +27,12 @@ import org.apache.impala.analysis.StringLiteral;
import org.apache.impala.catalog.Type;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.InternalException;
+import org.apache.impala.service.BackendConfig;
import org.apache.impala.service.FeSupport;
import org.apache.impala.thrift.TColumnValue;
+import java.util.List;
+
public class ExprUtil {
/**
* Converts a UTC timestamp to UNIX microseconds.
@@ -102,4 +105,20 @@ public class ExprUtil {
toUtcTimestamp.analyze(analyzer);
return toUtcTimestamp;
}
+
+ // Compute total cost for a list of expressions. Return 0 for a null list.
+ public static float computeExprsTotalCost(List<? extends Expr> exprs) {
+ // TODO: Implement the cost for conjunts once the implemetation for
+ // 'Expr' is in place.
+ if (exprs == null) return 0;
+ return exprs.size();
+ }
+
+ public static float computeExprCost(Expr e) {
+ if (e == null) return 0;
+ return 1;
+ // TODO Implement a function that can take into consideration of data types,
+ // expressions and potentially LLVM translation in BE. The function must also
+ // run fast.
+ }
}