You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/04/19 01:20:32 UTC
[5/6] incubator-impala git commit: IMPALA-3748: minimum buffer
requirements in planner
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/PlanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
index 978e0ac..d5c72a0 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
@@ -65,6 +65,10 @@ import com.google.common.math.LongMath;
abstract public class PlanNode extends TreeNode<PlanNode> {
private final static Logger LOG = LoggerFactory.getLogger(PlanNode.class);
+ // The size of buffer used in spilling nodes. Used in computeResourceProfile().
+ // TODO: IMPALA-3200: get from query option
+ protected final static long SPILLABLE_BUFFER_BYTES = 8L * 1024L * 1024L;
+
// String used for this node in getExplainString().
protected String displayName_;
@@ -110,13 +114,13 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
// set in computeStats(); invalid: -1
protected int numNodes_;
+ // resource requirements and estimates for this plan node.
+ // set in computeResourceProfile().
+ protected ResourceProfile resourceProfile_ = null;
+
// sum of tupleIds_' avgSerializedSizes; set in computeStats()
protected float avgRowSize_;
- // estimated per-host memory requirement for this node;
- // set in computeCosts(); invalid: -1
- protected long perHostMemCost_ = -1;
-
// If true, disable codegen for this plan node.
protected boolean disableCodegen_;
@@ -187,9 +191,9 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
}
public long getLimit() { return limit_; }
public boolean hasLimit() { return limit_ > -1; }
- public long getPerHostMemCost() { return perHostMemCost_; }
public long getCardinality() { return cardinality_; }
public int getNumNodes() { return numNodes_; }
+ public ResourceProfile getResourceProfile() { return resourceProfile_; }
public float getAvgRowSize() { return avgRowSize_; }
public void setFragment(PlanFragment fragment) { fragment_ = fragment; }
public PlanFragment getFragment() { return fragment_; }
@@ -235,8 +239,8 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
conjuncts_.clear();
}
- public String getExplainString() {
- return getExplainString("", "", TExplainLevel.VERBOSE);
+ public String getExplainString(TQueryOptions queryOptions) {
+ return getExplainString("", "", queryOptions, TExplainLevel.VERBOSE);
}
protected void setDisplayName(String s) { displayName_ = s; }
@@ -269,7 +273,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
* output will be prefixed by prefix.
*/
protected final String getExplainString(String rootPrefix, String prefix,
- TExplainLevel detailLevel) {
+ TQueryOptions queryOptions, TExplainLevel detailLevel) {
StringBuilder expBuilder = new StringBuilder();
String detailPrefix = prefix;
String filler;
@@ -302,11 +306,12 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
// Output cardinality, cost estimates and tuple Ids only when explain plan level
// is extended or above.
if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
- // Print estimated output cardinality and memory cost.
- expBuilder.append(PrintUtils.printHosts(detailPrefix, numNodes_));
- expBuilder.append(PrintUtils.printMemCost(" ", perHostMemCost_) + "\n");
+ // Print resource profile.
+ expBuilder.append(detailPrefix);
+ expBuilder.append(resourceProfile_.getExplainString());
+ expBuilder.append("\n");
- // Print tuple ids and row size.
+ // Print tuple ids, row size and cardinality.
expBuilder.append(detailPrefix + "tuple-ids=");
for (int i = 0; i < tupleIds_.size(); ++i) {
TupleId tupleId = tupleIds_.get(i);
@@ -331,15 +336,23 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
// we're crossing a fragment boundary
expBuilder.append(
child.fragment_.getExplainString(
- childHeadlinePrefix, childDetailPrefix, detailLevel));
+ childHeadlinePrefix, childDetailPrefix, queryOptions, detailLevel));
} else {
- expBuilder.append(
- child.getExplainString(childHeadlinePrefix, childDetailPrefix,
- detailLevel));
+ expBuilder.append(child.getExplainString(childHeadlinePrefix,
+ childDetailPrefix, queryOptions, detailLevel));
}
if (printFiller) expBuilder.append(filler + "\n");
}
- expBuilder.append(children_.get(0).getExplainString(prefix, prefix, detailLevel));
+ PlanFragment childFragment = children_.get(0).fragment_;
+ if (fragment_ != childFragment && detailLevel == TExplainLevel.EXTENDED) {
+ // we're crossing a fragment boundary - print the fragment header.
+ expBuilder.append(prefix);
+ expBuilder.append(
+ childFragment.getFragmentHeaderString(queryOptions.getMt_dop()));
+ expBuilder.append("\n");
+ }
+ expBuilder.append(
+ children_.get(0).getExplainString(prefix, prefix, queryOptions, detailLevel));
}
return expBuilder.toString();
}
@@ -379,7 +392,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
TExecStats estimatedStats = new TExecStats();
estimatedStats.setCardinality(cardinality_);
- estimatedStats.setMemory_used(perHostMemCost_);
+ estimatedStats.setMemory_used(resourceProfile_.getMemEstimateBytes());
msg.setLabel(getDisplayLabel());
msg.setLabel_detail(getDisplayLabelDetail());
msg.setEstimated_stats(estimatedStats);
@@ -605,13 +618,12 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
public boolean isBlockingNode() { return false; }
/**
- * Estimates the cost of executing this PlanNode. Currently only sets perHostMemCost_.
- * May only be called after this PlanNode has been placed in a PlanFragment because
- * the cost computation is dependent on the enclosing fragment's data partition.
+ * Compute resources consumed when executing this PlanNode, initializing
+ * 'resource_profile_'. May only be called after this PlanNode has been placed in a
+ * PlanFragment because the cost computation is dependent on the enclosing fragment's
+ * data partition.
*/
- public void computeCosts(TQueryOptions queryOptions) {
- perHostMemCost_ = 0;
- }
+ public abstract void computeResourceProfile(TQueryOptions queryOptions);
/**
* The input cardinality is the sum of output cardinalities of its children.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java b/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
index a199f54..fba9149 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
@@ -20,6 +20,7 @@ package org.apache.impala.planner;
import org.apache.impala.thrift.TDataSink;
import org.apache.impala.thrift.TDataSinkType;
import org.apache.impala.thrift.TExplainLevel;
+import org.apache.impala.thrift.TQueryOptions;
/**
* Sink for the root of a query plan that produces result rows. Allows coordination
@@ -28,9 +29,15 @@ import org.apache.impala.thrift.TExplainLevel;
*/
public class PlanRootSink extends DataSink {
- public String getExplainString(String prefix, String detailPrefix,
- TExplainLevel explainLevel) {
- return String.format("%sPLAN-ROOT SINK\n", prefix);
+ public void appendSinkExplainString(String prefix, String detailPrefix,
+ TQueryOptions queryOptions, TExplainLevel explainLevel, StringBuilder output) {
+ output.append(String.format("%sPLAN-ROOT SINK\n", prefix));
+ }
+
+ @Override
+ public void computeResourceProfile(TQueryOptions queryOptions) {
+ // TODO: add a memory estimate
+ resourceProfile_ = new ResourceProfile(0, 0);
}
protected TDataSink toThrift() {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/Planner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index 8842c9c..ad5bba5 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -40,6 +40,7 @@ import org.apache.impala.service.BackendConfig;
import org.apache.impala.thrift.TExplainLevel;
import org.apache.impala.thrift.TQueryCtx;
import org.apache.impala.thrift.TQueryExecRequest;
+import org.apache.impala.thrift.TQueryOptions;
import org.apache.impala.thrift.TRuntimeFilterMode;
import org.apache.impala.thrift.TTableName;
import org.apache.impala.util.MaxRowsProcessedVisitor;
@@ -266,11 +267,14 @@ public class Planner {
TQueryExecRequest request, TExplainLevel explainLevel) {
StringBuilder str = new StringBuilder();
boolean hasHeader = false;
- if (request.isSetPer_host_mem_req() && request.isSetPer_host_vcores()) {
- str.append(
- String.format("Estimated Per-Host Requirements: Memory=%s VCores=%s\n",
- PrintUtils.printBytes(request.getPer_host_mem_req()),
- request.per_host_vcores));
+ if (request.isSetPer_host_min_reservation()) {
+ str.append(String.format("Per-Host Resource Reservation: Memory=%s\n",
+ PrintUtils.printBytes(request.getPer_host_min_reservation()))) ;
+ hasHeader = true;
+ }
+ if (request.isSetPer_host_mem_estimate()) {
+ str.append(String.format("Per-Host Resource Estimates: Memory=%s\n",
+ PrintUtils.printBytes(request.getPer_host_mem_estimate())));
hasHeader = true;
}
@@ -324,12 +328,12 @@ public class Planner {
if (explainLevel.ordinal() < TExplainLevel.VERBOSE.ordinal()) {
// Print the non-fragmented parallel plan.
- str.append(fragments.get(0).getExplainString(explainLevel));
+ str.append(fragments.get(0).getExplainString(ctx_.getQueryOptions(), explainLevel));
} else {
// Print the fragmented parallel plan.
for (int i = 0; i < fragments.size(); ++i) {
PlanFragment fragment = fragments.get(i);
- str.append(fragment.getExplainString(explainLevel));
+ str.append(fragment.getExplainString(ctx_.getQueryOptions(), explainLevel));
if (i < fragments.size() - 1) str.append("\n");
}
}
@@ -337,91 +341,46 @@ public class Planner {
}
/**
- * Returns true if the fragments are for a trivial, coordinator-only query:
- * Case 1: Only an EmptySetNode, e.g. query has a limit 0.
- * Case 2: Query has only constant exprs.
- */
- private static boolean isTrivialCoordOnlyPlan(List<PlanFragment> fragments) {
- Preconditions.checkNotNull(fragments);
- Preconditions.checkState(!fragments.isEmpty());
- if (fragments.size() > 1) return false;
- PlanNode root = fragments.get(0).getPlanRoot();
- if (root instanceof EmptySetNode) return true;
- if (root instanceof UnionNode && ((UnionNode) root).isConstantUnion()) return true;
- return false;
- }
-
- /**
- * Estimates the per-host memory and CPU requirements for the given plan fragments,
- * and sets the results in request.
- * Optionally excludes the requirements for unpartitioned fragments.
+ * Estimates the per-host resource requirements for the given plans, and sets the
+ * results in request.
* TODO: The LOG.warn() messages should eventually become Preconditions checks
* once resource estimation is more robust.
- * TODO: Revisit and possibly remove during MT work, particularly references to vcores.
*/
- public void computeResourceReqs(List<PlanFragment> fragments,
- boolean excludeUnpartitionedFragments,
+ public void computeResourceReqs(List<PlanFragment> planRoots,
TQueryExecRequest request) {
- Preconditions.checkState(!fragments.isEmpty());
+ Preconditions.checkState(!planRoots.isEmpty());
Preconditions.checkNotNull(request);
- // Compute pipelined plan node sets.
- ArrayList<PipelinedPlanNodeSet> planNodeSets =
- PipelinedPlanNodeSet.computePlanNodeSets(fragments.get(0).getPlanRoot());
-
- // Compute the max of the per-host mem and vcores requirement.
- // Note that the max mem and vcores may come from different plan node sets.
- long maxPerHostMem = Long.MIN_VALUE;
- int maxPerHostVcores = Integer.MIN_VALUE;
- for (PipelinedPlanNodeSet planNodeSet: planNodeSets) {
- if (!planNodeSet.computeResourceEstimates(
- excludeUnpartitionedFragments, ctx_.getQueryOptions())) {
- continue;
- }
- long perHostMem = planNodeSet.getPerHostMem();
- int perHostVcores = planNodeSet.getPerHostVcores();
- if (perHostMem > maxPerHostMem) maxPerHostMem = perHostMem;
- if (perHostVcores > maxPerHostVcores) maxPerHostVcores = perHostVcores;
- }
-
- // Do not ask for more cores than are in the RuntimeEnv.
- maxPerHostVcores = Math.min(maxPerHostVcores, RuntimeEnv.INSTANCE.getNumCores());
-
- // Special case for some trivial coordinator-only queries (IMPALA-3053, IMPALA-1092).
- if (isTrivialCoordOnlyPlan(fragments)) {
- maxPerHostMem = 1024;
- maxPerHostVcores = 1;
- }
-
- // Set costs to zero if there are only unpartitioned fragments and
- // excludeUnpartitionedFragments is true.
- // TODO: handle this case with a better indication for unknown, e.g. -1 or not set.
- if (maxPerHostMem == Long.MIN_VALUE || maxPerHostVcores == Integer.MIN_VALUE) {
- boolean allUnpartitioned = true;
- for (PlanFragment fragment: fragments) {
- if (fragment.isPartitioned()) {
- allUnpartitioned = false;
- break;
- }
+ // Compute the sum over all plans.
+ // TODO: Revisit during MT work - scheduling of fragments will change and computing
+ // the sum may not be correct or optimal.
+ ResourceProfile totalResources = ResourceProfile.invalid();
+ for (PlanFragment planRoot: planRoots) {
+ ResourceProfile planMaxResources = ResourceProfile.invalid();
+ ArrayList<PlanFragment> fragments = planRoot.getNodesPreOrder();
+ // Compute pipelined plan node sets.
+ ArrayList<PipelinedPlanNodeSet> planNodeSets =
+ PipelinedPlanNodeSet.computePlanNodeSets(fragments.get(0).getPlanRoot());
+
+ // Compute the max of the per-host resources requirement.
+ // Note that the different maxes may come from different plan node sets.
+ for (PipelinedPlanNodeSet planNodeSet : planNodeSets) {
+ TQueryOptions queryOptions = ctx_.getQueryOptions();
+ ResourceProfile perHostResources =
+ planNodeSet.computePerHostResources(queryOptions);
+ if (!perHostResources.isValid()) continue;
+ planMaxResources = ResourceProfile.max(planMaxResources, perHostResources);
}
- if (allUnpartitioned && excludeUnpartitionedFragments) {
- maxPerHostMem = 0;
- maxPerHostVcores = 0;
- }
- }
-
- if (maxPerHostMem < 0 || maxPerHostMem == Long.MIN_VALUE) {
- LOG.warn("Invalid per-host memory requirement: " + maxPerHostMem);
- }
- if (maxPerHostVcores < 0 || maxPerHostVcores == Integer.MIN_VALUE) {
- LOG.warn("Invalid per-host virtual cores requirement: " + maxPerHostVcores);
+ totalResources = ResourceProfile.sum(totalResources, planMaxResources);
}
- request.setPer_host_mem_req(maxPerHostMem);
- request.setPer_host_vcores((short) maxPerHostVcores);
+ Preconditions.checkState(totalResources.getMemEstimateBytes() >= 0);
+ Preconditions.checkState(totalResources.getMinReservationBytes() >= 0);
+ request.setPer_host_mem_estimate(totalResources.getMemEstimateBytes());
+ request.setPer_host_min_reservation(totalResources.getMinReservationBytes());
if (LOG.isTraceEnabled()) {
- LOG.trace("Estimated per-host peak memory requirement: " + maxPerHostMem);
- LOG.trace("Estimated per-host virtual cores requirement: " + maxPerHostVcores);
+ LOG.trace("Per-host min buffer : " + totalResources.getMinReservationBytes());
+ LOG.trace("Estimated per-host memory: " + totalResources.getMemEstimateBytes());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/ResourceProfile.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/ResourceProfile.java b/fe/src/main/java/org/apache/impala/planner/ResourceProfile.java
new file mode 100644
index 0000000..c0dc607
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/planner/ResourceProfile.java
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.planner;
+
+import org.apache.impala.common.PrintUtils;
+
+/**
+ * The resources that will be consumed by a set of plan nodes.
+ */
+public class ResourceProfile {
+ // If the computed values are valid.
+ private final boolean isValid_;
+
+ // Estimated memory consumption in bytes.
+ // TODO: IMPALA-5013: currently we are inconsistent about how these estimates are
+ // derived or what they mean. Re-evaluate what they mean and either deprecate or
+ // fix them.
+ private final long memEstimateBytes_;
+
+ // Minimum buffer reservation required to execute in bytes.
+ private final long minReservationBytes_;
+
+ private ResourceProfile(boolean isValid, long memEstimateBytes, long minReservationBytes) {
+ isValid_ = isValid;
+ memEstimateBytes_ = memEstimateBytes;
+ minReservationBytes_ = minReservationBytes;
+ }
+
+ public ResourceProfile(long memEstimateBytes, long minReservationBytes) {
+ this(true, memEstimateBytes, minReservationBytes);
+ }
+
+ public static ResourceProfile invalid() {
+ return new ResourceProfile(false, -1, -1);
+ }
+
+ public boolean isValid() { return isValid_; }
+ public long getMemEstimateBytes() { return memEstimateBytes_; }
+ public long getMinReservationBytes() { return minReservationBytes_; }
+
+ // Return a string with the resource profile information suitable for display in an
+ // explain plan in a format like: "resource1=value resource2=value"
+ public String getExplainString() {
+ StringBuilder output = new StringBuilder();
+ output.append("mem-estimate=");
+ output.append(isValid_ ? PrintUtils.printBytes(memEstimateBytes_) : "invalid");
+ output.append(" mem-reservation=");
+ output.append(isValid_ ? PrintUtils.printBytes(minReservationBytes_) : "invalid");
+ return output.toString();
+ }
+
+ // Returns a profile with the max of each value in 'p1' and 'p2'.
+ public static ResourceProfile max(ResourceProfile p1, ResourceProfile p2) {
+ if (!p1.isValid()) return p2;
+ if (!p2.isValid()) return p1;
+ return new ResourceProfile(
+ Math.max(p1.getMemEstimateBytes(), p2.getMemEstimateBytes()),
+ Math.max(p1.getMinReservationBytes(), p2.getMinReservationBytes()));
+ }
+
+ // Returns a profile with the sum of each value in 'p1' and 'p2'.
+ public static ResourceProfile sum(ResourceProfile p1, ResourceProfile p2) {
+ if (!p1.isValid()) return p2;
+ if (!p2.isValid()) return p1;
+ return new ResourceProfile(p1.getMemEstimateBytes() + p2.getMemEstimateBytes(),
+ p1.getMinReservationBytes() + p2.getMinReservationBytes());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/SelectNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/SelectNode.java b/fe/src/main/java/org/apache/impala/planner/SelectNode.java
index e09d572..c346df9 100644
--- a/fe/src/main/java/org/apache/impala/planner/SelectNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SelectNode.java
@@ -27,6 +27,8 @@ import org.apache.impala.analysis.Expr;
import org.apache.impala.thrift.TExplainLevel;
import org.apache.impala.thrift.TPlanNode;
import org.apache.impala.thrift.TPlanNodeType;
+import org.apache.impala.thrift.TQueryOptions;
+
import com.google.common.base.Preconditions;
/**
@@ -80,6 +82,12 @@ public class SelectNode extends PlanNode {
}
@Override
+ public void computeResourceProfile(TQueryOptions queryOptions) {
+ // TODO: add an estimate
+ resourceProfile_ = new ResourceProfile(0, 0);
+ }
+
+ @Override
protected String getNodeExplainString(String prefix, String detailPrefix,
TExplainLevel detailLevel) {
StringBuilder output = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java b/fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java
index 5b66d18..82a1c41 100644
--- a/fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java
@@ -22,6 +22,8 @@ import org.apache.impala.common.ImpalaException;
import org.apache.impala.thrift.TExplainLevel;
import org.apache.impala.thrift.TPlanNode;
import org.apache.impala.thrift.TPlanNodeType;
+import org.apache.impala.thrift.TQueryOptions;
+
import com.google.common.base.Preconditions;
/**
@@ -64,6 +66,12 @@ public class SingularRowSrcNode extends PlanNode {
}
@Override
+ public void computeResourceProfile(TQueryOptions queryOptions) {
+ // TODO: add an estimate
+ resourceProfile_ = new ResourceProfile(0, 0);
+ }
+
+ @Override
protected String getNodeExplainString(String prefix, String detailPrefix,
TExplainLevel detailLevel) {
StringBuilder output = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/SortNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/SortNode.java b/fe/src/main/java/org/apache/impala/planner/SortNode.java
index 0533b22..ef05499 100644
--- a/fe/src/main/java/org/apache/impala/planner/SortNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SortNode.java
@@ -205,10 +205,12 @@ public class SortNode extends PlanNode {
}
@Override
- public void computeCosts(TQueryOptions queryOptions) {
+ public void computeResourceProfile(TQueryOptions queryOptions) {
Preconditions.checkState(hasValidStats());
if (useTopN_) {
- perHostMemCost_ = (long) Math.ceil((cardinality_ + offset_) * avgRowSize_);
+ long perInstanceMemEstimate =
+ (long) Math.ceil((cardinality_ + offset_) * avgRowSize_);
+ resourceProfile_ = new ResourceProfile(perInstanceMemEstimate, 0);
return;
}
@@ -233,7 +235,15 @@ public class SortNode extends PlanNode {
// doubles the block size when there are var-len columns present.
if (hasVarLenSlots) blockSize *= 2;
double numInputBlocks = Math.ceil(fullInputSize / blockSize);
- perHostMemCost_ = blockSize * (long) Math.ceil(Math.sqrt(numInputBlocks));
+ long perInstanceMemEstimate = blockSize * (long) Math.ceil(Math.sqrt(numInputBlocks));
+
+ // Must be kept in sync with min_buffers_required in Sorter in be.
+ long perInstanceMinReservation = 3 * SPILLABLE_BUFFER_BYTES;
+ if (info_.getSortTupleDescriptor().hasVarLenSlots()) {
+ perInstanceMinReservation *= 2;
+ }
+ resourceProfile_ =
+ new ResourceProfile(perInstanceMemEstimate, perInstanceMinReservation);
}
private static String getDisplayName(boolean isTopN, boolean isMergeOnly) {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/SubplanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/SubplanNode.java b/fe/src/main/java/org/apache/impala/planner/SubplanNode.java
index 6143255..cbe7087 100644
--- a/fe/src/main/java/org/apache/impala/planner/SubplanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SubplanNode.java
@@ -22,6 +22,8 @@ import org.apache.impala.common.InternalException;
import org.apache.impala.thrift.TExplainLevel;
import org.apache.impala.thrift.TPlanNode;
import org.apache.impala.thrift.TPlanNodeType;
+import org.apache.impala.thrift.TQueryOptions;
+
import com.google.common.base.Preconditions;
/**
@@ -91,6 +93,12 @@ public class SubplanNode extends PlanNode {
}
@Override
+ public void computeResourceProfile(TQueryOptions queryOptions) {
+ // TODO: add an estimate
+ resourceProfile_ = new ResourceProfile(0, 0);
+ }
+
+ @Override
protected String getNodeExplainString(String prefix, String detailPrefix,
TExplainLevel detailLevel) {
StringBuilder output = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/UnionNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/UnionNode.java b/fe/src/main/java/org/apache/impala/planner/UnionNode.java
index d724c59..6b8d331 100644
--- a/fe/src/main/java/org/apache/impala/planner/UnionNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/UnionNode.java
@@ -30,6 +30,7 @@ import org.apache.impala.thrift.TExplainLevel;
import org.apache.impala.thrift.TExpr;
import org.apache.impala.thrift.TPlanNode;
import org.apache.impala.thrift.TPlanNodeType;
+import org.apache.impala.thrift.TQueryOptions;
import org.apache.impala.thrift.TUnionNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -127,6 +128,12 @@ public class UnionNode extends PlanNode {
}
}
+ @Override
+ public void computeResourceProfile(TQueryOptions queryOptions) {
+ // TODO: add an estimate
+ resourceProfile_ = new ResourceProfile(0, 0);
+ }
+
/**
* Returns true if rows from the child with 'childTupleIds' and 'childResultExprs' can
* be returned directly by the union node (without materialization into a new tuple).
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/planner/UnnestNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/UnnestNode.java b/fe/src/main/java/org/apache/impala/planner/UnnestNode.java
index 35abc55..5847e62 100644
--- a/fe/src/main/java/org/apache/impala/planner/UnnestNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/UnnestNode.java
@@ -24,6 +24,7 @@ import org.apache.impala.common.ImpalaException;
import org.apache.impala.thrift.TExplainLevel;
import org.apache.impala.thrift.TPlanNode;
import org.apache.impala.thrift.TPlanNodeType;
+import org.apache.impala.thrift.TQueryOptions;
import org.apache.impala.thrift.TUnnestNode;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
@@ -73,6 +74,12 @@ public class UnnestNode extends PlanNode {
}
@Override
+ public void computeResourceProfile(TQueryOptions queryOptions) {
+ // TODO: add an estimate
+ resourceProfile_ = new ResourceProfile(0, 0);
+ }
+
+ @Override
protected String getNodeExplainString(String prefix, String detailPrefix,
TExplainLevel detailLevel) {
StringBuilder output = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/main/java/org/apache/impala/service/Frontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index d3cefa6..1348129 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -962,6 +962,9 @@ public class Frontend {
}
}
+ // Clear pre-existing lists to avoid adding duplicate entries in FE tests.
+ queryCtx.unsetTables_missing_stats();
+ queryCtx.unsetTables_with_corrupt_stats();
for (TTableName tableName: tablesMissingStats) {
queryCtx.addToTables_missing_stats(tableName);
}
@@ -972,16 +975,6 @@ public class Frontend {
queryCtx.addToTables_missing_diskids(tableName);
}
- // Compute resource requirements after scan range locations because the cost
- // estimates of scan nodes rely on them.
- try {
- planner.computeResourceReqs(fragments, true, queryExecRequest);
- } catch (Exception e) {
- // Turn exceptions into a warning to allow the query to execute.
- LOG.error("Failed to compute resource requirements for query\n" +
- queryCtx.client_request.getStmt(), e);
- }
-
// The fragment at this point has all state set, serialize it to thrift.
for (PlanFragment fragment: fragments) {
TPlanFragment thriftFragment = fragment.toThrift();
@@ -1020,6 +1013,10 @@ public class Frontend {
createPlanExecInfo(planRoot, planner, queryCtx, result));
}
+ // Compute resource requirements after scan range locations because the cost
+ // estimates of scan nodes rely on them.
+ planner.computeResourceReqs(planRoots, result);
+
// Optionally disable spilling in the backend. Allow spilling if there are plan hints
// or if all tables have stats.
boolean disableSpilling =
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 5148f68..363c59c 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -369,4 +369,14 @@ public class PlannerTest extends PlannerTestBase {
// Check that the effective MT_DOP is as expected.
Assert.assertEquals(actualMtDop, expectedMtDop);
}
+
+ @Test
+ public void testResourceRequirements() {
+ // Tests the resource requirement computation from the planner.
+ TQueryOptions options = defaultQueryOptions();
+ options.setExplain_level(TExplainLevel.EXTENDED);
+ options.setNum_scanner_threads(1); // Required so that output doesn't vary by machine
+ runPlannerTestFile("resource-requirements", options, false);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
index d354897..eceaeca 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
@@ -390,7 +390,8 @@ public class PlannerTestBase extends FrontendTestBase {
* of 'testCase'.
*/
private void runTestCase(TestCase testCase, StringBuilder errorLog,
- StringBuilder actualOutput, String dbName, TQueryOptions options)
+ StringBuilder actualOutput, String dbName, TQueryOptions options,
+ boolean ignoreExplainHeader)
throws CatalogException {
if (options == null) {
options = defaultQueryOptions();
@@ -408,16 +409,18 @@ public class PlannerTestBase extends FrontendTestBase {
dbName, System.getProperty("user.name"));
queryCtx.client_request.query_options = options;
// Test single node plan, scan range locations, and column lineage.
- TExecRequest singleNodeExecRequest =
- testPlan(testCase, Section.PLAN, queryCtx, errorLog, actualOutput);
+ TExecRequest singleNodeExecRequest = testPlan(testCase, Section.PLAN, queryCtx,
+ ignoreExplainHeader, errorLog, actualOutput);
validateTableIds(singleNodeExecRequest);
checkScanRangeLocations(testCase, singleNodeExecRequest, errorLog, actualOutput);
checkColumnLineage(testCase, singleNodeExecRequest, errorLog, actualOutput);
checkLimitCardinality(query, singleNodeExecRequest, errorLog);
// Test distributed plan.
- testPlan(testCase, Section.DISTRIBUTEDPLAN, queryCtx, errorLog, actualOutput);
+ testPlan(testCase, Section.DISTRIBUTEDPLAN, queryCtx, ignoreExplainHeader, errorLog,
+ actualOutput);
// test parallel plans
- testPlan(testCase, Section.PARALLELPLANS, queryCtx, errorLog, actualOutput);
+ testPlan(testCase, Section.PARALLELPLANS, queryCtx, ignoreExplainHeader, errorLog,
+ actualOutput);
}
/**
@@ -471,19 +474,26 @@ public class PlannerTestBase extends FrontendTestBase {
*
* Returns the produced exec request or null if there was an error generating
* the plan.
+ *
+ * If ignoreExplainHeader is true, the explain header with warnings and resource
+ * estimates is stripped out.
*/
private TExecRequest testPlan(TestCase testCase, Section section,
- TQueryCtx queryCtx, StringBuilder errorLog, StringBuilder actualOutput) {
+ TQueryCtx queryCtx, boolean ignoreExplainHeader,
+ StringBuilder errorLog, StringBuilder actualOutput) {
String query = testCase.getQuery();
queryCtx.client_request.setStmt(query);
+ TQueryOptions queryOptions = queryCtx.client_request.getQuery_options();
if (section == Section.PLAN) {
- queryCtx.client_request.getQuery_options().setNum_nodes(1);
+ queryOptions.setNum_nodes(1);
} else {
// for distributed and parallel execution we want to run on all available nodes
- queryCtx.client_request.getQuery_options().setNum_nodes(
+ queryOptions.setNum_nodes(
ImpalaInternalServiceConstants.NUM_NODES_ALL);
}
- if (section == Section.PARALLELPLANS) {
+ if (section == Section.PARALLELPLANS
+ && (!queryOptions.isSetMt_dop() || queryOptions.getMt_dop() == 0)) {
+ // Set mt_dop to force production of parallel plans.
queryCtx.client_request.query_options.setMt_dop(2);
}
ArrayList<String> expectedPlan = testCase.getSectionContents(section);
@@ -507,7 +517,8 @@ public class PlannerTestBase extends FrontendTestBase {
// Failed to produce an exec request.
if (execRequest == null) return null;
- String explainStr = removeExplainHeader(explainBuilder.toString());
+ String explainStr = explainBuilder.toString();
+ if (ignoreExplainHeader) explainStr = removeExplainHeader(explainStr);
actualOutput.append(explainStr);
LOG.info(section.toString() + ":" + explainStr);
if (expectedErrorMsg != null) {
@@ -702,10 +713,16 @@ public class PlannerTestBase extends FrontendTestBase {
}
protected void runPlannerTestFile(String testFile, TQueryOptions options) {
- runPlannerTestFile(testFile, "default", options);
+ runPlannerTestFile(testFile, options, true);
+ }
+
+ protected void runPlannerTestFile(String testFile, TQueryOptions options,
+ boolean ignoreExplainHeader) {
+ runPlannerTestFile(testFile, "default", options, ignoreExplainHeader);
}
- private void runPlannerTestFile(String testFile, String dbName, TQueryOptions options) {
+ private void runPlannerTestFile(String testFile, String dbName, TQueryOptions options,
+ boolean ignoreExplainHeader) {
String fileName = testDir_ + "/" + testFile + ".test";
TestFileParser queryFileParser = new TestFileParser(fileName);
StringBuilder actualOutput = new StringBuilder();
@@ -716,7 +733,8 @@ public class PlannerTestBase extends FrontendTestBase {
actualOutput.append(testCase.getSectionAsString(Section.QUERY, true, "\n"));
actualOutput.append("\n");
try {
- runTestCase(testCase, errorLog, actualOutput, dbName, options);
+ runTestCase(testCase, errorLog, actualOutput, dbName, options,
+ ignoreExplainHeader);
} catch (CatalogException e) {
errorLog.append(String.format("Failed to plan query\n%s\n%s",
testCase.getQuery(), e.getMessage()));
@@ -743,10 +761,10 @@ public class PlannerTestBase extends FrontendTestBase {
}
protected void runPlannerTestFile(String testFile) {
- runPlannerTestFile(testFile, "default", null);
+ runPlannerTestFile(testFile, "default", null, true);
}
protected void runPlannerTestFile(String testFile, String dbName) {
- runPlannerTestFile(testFile, dbName, null);
+ runPlannerTestFile(testFile, dbName, null, true);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test b/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
index 3a9855d..7effd9b 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
@@ -4,42 +4,44 @@ from tpch_nested_parquet.customer c, c.c_orders o, o.o_lineitems
where 5 + 5 < c_custkey and o_orderkey = (2 + 2)
and (coalesce(2, 3, 4) * 10) + l_linenumber < (0 * 1)
---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
PLAN-ROOT SINK
+| mem-estimate=0B mem-reservation=0B
|
01:SUBPLAN
-| hosts=3 per-host-mem=unavailable
+| mem-estimate=0B mem-reservation=0B
| tuple-ids=2,1,0 row-size=52B cardinality=1500000
|
|--08:NESTED LOOP JOIN [CROSS JOIN]
-| | hosts=3 per-host-mem=unavailable
+| | mem-estimate=24B mem-reservation=0B
| | tuple-ids=2,1,0 row-size=52B cardinality=100
| |
| |--02:SINGULAR ROW SRC
| | parent-subplan=01
-| | hosts=3 per-host-mem=unavailable
+| | mem-estimate=0B mem-reservation=0B
| | tuple-ids=0 row-size=24B cardinality=1
| |
| 04:SUBPLAN
-| | hosts=3 per-host-mem=unavailable
+| | mem-estimate=0B mem-reservation=0B
| | tuple-ids=2,1 row-size=28B cardinality=100
| |
| |--07:NESTED LOOP JOIN [CROSS JOIN]
-| | | hosts=3 per-host-mem=unavailable
+| | | mem-estimate=24B mem-reservation=0B
| | | tuple-ids=2,1 row-size=28B cardinality=10
| | |
| | |--05:SINGULAR ROW SRC
| | | parent-subplan=04
-| | | hosts=3 per-host-mem=unavailable
+| | | mem-estimate=0B mem-reservation=0B
| | | tuple-ids=1 row-size=24B cardinality=1
| | |
| | 06:UNNEST [o.o_lineitems]
| | parent-subplan=04
-| | hosts=3 per-host-mem=unavailable
+| | mem-estimate=0B mem-reservation=0B
| | tuple-ids=2 row-size=0B cardinality=10
| |
| 03:UNNEST [c.c_orders o]
| parent-subplan=01
-| hosts=3 per-host-mem=unavailable
+| mem-estimate=0B mem-reservation=0B
| tuple-ids=1 row-size=0B cardinality=10
|
00:SCAN HDFS [tpch_nested_parquet.customer c]
@@ -51,7 +53,7 @@ PLAN-ROOT SINK
columns missing stats: c_orders
parquet statistics predicates: c_custkey > 10
parquet dictionary predicates: c_custkey > 10
- hosts=3 per-host-mem=unavailable
+ mem-estimate=176.00MB mem-reservation=0B
tuple-ids=0 row-size=24B cardinality=15000
====
# Test HBase scan node.
@@ -59,7 +61,9 @@ select * from functional_hbase.stringids
where string_col = cast(4 as string) and 2 + 3 = tinyint_col
and id between concat('1', '0') and upper('20')
---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
PLAN-ROOT SINK
+| mem-estimate=0B mem-reservation=0B
|
00:SCAN HBASE [functional_hbase.stringids]
start key: 10
@@ -68,19 +72,21 @@ PLAN-ROOT SINK
predicates: tinyint_col = 5, string_col = '4'
table stats: 10000 rows total
column stats: all
- hosts=100 per-host-mem=unavailable
+ mem-estimate=1.00GB mem-reservation=0B
tuple-ids=0 row-size=119B cardinality=1
====
# Test datasource scan node.
select * from functional.alltypes_datasource
where tinyint_col < (pow(2, 8)) and float_col != 0 and 1 + 1 > int_col
---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
PLAN-ROOT SINK
+| mem-estimate=0B mem-reservation=0B
|
00:SCAN DATA SOURCE [functional.alltypes_datasource]
data source predicates: tinyint_col < 256, int_col < 2
predicates: float_col != 0
- hosts=1 per-host-mem=unavailable
+ mem-estimate=1.00GB mem-reservation=0B
tuple-ids=0 row-size=116B cardinality=500
====
# Test aggregation.
@@ -91,20 +97,22 @@ having 1024 * 1024 * count(*) % 2 = 0
and (sm > 1 or sm > 1)
and (sm between 5 and 10)
---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
PLAN-ROOT SINK
+| mem-estimate=0B mem-reservation=0B
|
01:AGGREGATE [FINALIZE]
| output: sum(2 + id), count(*)
| group by: timestamp_col = TIMESTAMP '2016-11-15 00:00:00'
| having: sum(2 + id) <= 10, sum(2 + id) > 1, sum(2 + id) >= 5, 1048576 * count(*) % 2 = 0
-| hosts=3 per-host-mem=unavailable
+| mem-estimate=10.00MB mem-reservation=264.00MB
| tuple-ids=1 row-size=17B cardinality=0
|
00:SCAN HDFS [functional.alltypes]
partitions=24/24 files=24 size=478.45KB
table stats: 7300 rows total
column stats: all
- hosts=3 per-host-mem=unavailable
+ mem-estimate=128.00MB mem-reservation=0B
tuple-ids=0 row-size=20B cardinality=7300
====
# Test hash join.
@@ -114,13 +122,15 @@ left outer join functional.alltypes b
a.int_col between 0 + 0 + 0 + b.bigint_col and b.bigint_col + ascii('a'))
where round(1.11 + 2.22 + 3.33 + 4.44, 1) < cast(b.double_col as decimal(3, 2))
---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
PLAN-ROOT SINK
+| mem-estimate=0B mem-reservation=0B
|
02:HASH JOIN [LEFT OUTER JOIN]
| hash predicates: 2 + a.id = b.id - 2
| other join predicates: a.int_col <= b.bigint_col + 97, a.int_col >= 0 + b.bigint_col
| other predicates: CAST(b.double_col AS DECIMAL(3,2)) > 11.1
-| hosts=3 per-host-mem=unavailable
+| mem-estimate=15.68KB mem-reservation=136.00MB
| tuple-ids=0,1N row-size=28B cardinality=7300
|
|--01:SCAN HDFS [functional.alltypes b]
@@ -129,14 +139,14 @@ PLAN-ROOT SINK
| table stats: 7300 rows total
| column stats: all
| parquet dictionary predicates: CAST(b.double_col AS DECIMAL(3,2)) > 11.1
-| hosts=3 per-host-mem=unavailable
+| mem-estimate=128.00MB mem-reservation=0B
| tuple-ids=1 row-size=20B cardinality=730
|
00:SCAN HDFS [functional.alltypes a]
partitions=24/24 files=24 size=478.45KB
table stats: 7300 rows total
column stats: all
- hosts=3 per-host-mem=unavailable
+ mem-estimate=128.00MB mem-reservation=0B
tuple-ids=0 row-size=8B cardinality=7300
====
# Test nested-loop join. Same as above but and with a disjunction in the On clause.
@@ -147,12 +157,14 @@ left outer join functional.alltypes b
a.int_col between 0 + 0 + 0 + b.bigint_col and b.bigint_col + ascii('a'))
where cast(b.double_col as decimal(3, 2)) > round(1.11 + 2.22 + 3.33 + 4.44, 1)
---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
PLAN-ROOT SINK
+| mem-estimate=0B mem-reservation=0B
|
02:NESTED LOOP JOIN [LEFT OUTER JOIN]
| join predicates: (2 + a.id = b.id - 2 OR a.int_col >= 0 + b.bigint_col AND a.int_col <= b.bigint_col + 97)
| predicates: CAST(b.double_col AS DECIMAL(3,2)) > 11.1
-| hosts=3 per-host-mem=unavailable
+| mem-estimate=14.26KB mem-reservation=0B
| tuple-ids=0,1N row-size=28B cardinality=7300
|
|--01:SCAN HDFS [functional.alltypes b]
@@ -161,14 +173,14 @@ PLAN-ROOT SINK
| table stats: 7300 rows total
| column stats: all
| parquet dictionary predicates: CAST(b.double_col AS DECIMAL(3,2)) > 11.1
-| hosts=3 per-host-mem=unavailable
+| mem-estimate=128.00MB mem-reservation=0B
| tuple-ids=1 row-size=20B cardinality=730
|
00:SCAN HDFS [functional.alltypes a]
partitions=24/24 files=24 size=478.45KB
table stats: 7300 rows total
column stats: all
- hosts=3 per-host-mem=unavailable
+ mem-estimate=128.00MB mem-reservation=0B
tuple-ids=0 row-size=8B cardinality=7300
====
# Test distinct aggregation with grouping.
@@ -177,26 +189,28 @@ from functional.alltypes
group by timestamp_col = cast('2015-11-15' as timestamp) + interval 1 year
having 1024 * 1024 * count(*) % 2 = 0
---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
PLAN-ROOT SINK
+| mem-estimate=0B mem-reservation=0B
|
02:AGGREGATE [FINALIZE]
| output: sum(2 + id), count:merge(*)
| group by: timestamp_col = TIMESTAMP '2016-11-15 00:00:00'
| having: 1048576 * count(*) % 2 = 0
-| hosts=3 per-host-mem=unavailable
+| mem-estimate=10.00MB mem-reservation=264.00MB
| tuple-ids=2 row-size=17B cardinality=0
|
01:AGGREGATE
| output: count(*)
| group by: timestamp_col = TIMESTAMP '2016-11-15 00:00:00', 2 + id
-| hosts=3 per-host-mem=unavailable
+| mem-estimate=10.00MB mem-reservation=264.00MB
| tuple-ids=1 row-size=17B cardinality=7300
|
00:SCAN HDFS [functional.alltypes]
partitions=24/24 files=24 size=478.45KB
table stats: 7300 rows total
column stats: all
- hosts=3 per-host-mem=unavailable
+ mem-estimate=128.00MB mem-reservation=0B
tuple-ids=0 row-size=20B cardinality=7300
====
# Test non-grouping distinct aggregation.
@@ -204,25 +218,27 @@ select sum(distinct 1 + 1 + id)
from functional.alltypes
having 1024 * 1024 * count(*) % 2 = 0
---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
PLAN-ROOT SINK
+| mem-estimate=0B mem-reservation=0B
|
02:AGGREGATE [FINALIZE]
| output: sum(2 + id), count:merge(*)
| having: 1048576 * zeroifnull(count(*)) % 2 = 0
-| hosts=3 per-host-mem=unavailable
+| mem-estimate=10.00MB mem-reservation=0B
| tuple-ids=2 row-size=16B cardinality=0
|
01:AGGREGATE
| output: count(*)
| group by: 2 + id
-| hosts=3 per-host-mem=unavailable
+| mem-estimate=10.00MB mem-reservation=264.00MB
| tuple-ids=1 row-size=16B cardinality=7300
|
00:SCAN HDFS [functional.alltypes]
partitions=24/24 files=24 size=478.45KB
table stats: 7300 rows total
column stats: all
- hosts=3 per-host-mem=unavailable
+ mem-estimate=128.00MB mem-reservation=0B
tuple-ids=0 row-size=4B cardinality=7300
====
# Test analytic eval node.
@@ -231,44 +247,48 @@ select first_value(1 + 1 + int_col - (1 - 1)) over
order by greatest(greatest(10, 20), bigint_col))
from functional.alltypes
---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
PLAN-ROOT SINK
+| mem-estimate=0B mem-reservation=0B
|
02:ANALYTIC
| functions: first_value(2 + int_col - 0)
| partition by: concat('ab', string_col)
| order by: greatest(20, bigint_col) ASC
| window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
-| hosts=3 per-host-mem=unavailable
+| mem-estimate=0B mem-reservation=16.00MB
| tuple-ids=3,2 row-size=37B cardinality=7300
|
01:SORT
| order by: concat('ab', string_col) ASC NULLS FIRST, greatest(20, bigint_col) ASC
-| hosts=3 per-host-mem=unavailable
+| mem-estimate=16.00MB mem-reservation=48.00MB
| tuple-ids=3 row-size=29B cardinality=7300
|
00:SCAN HDFS [functional.alltypes]
partitions=24/24 files=24 size=478.45KB
table stats: 7300 rows total
column stats: all
- hosts=3 per-host-mem=unavailable
+ mem-estimate=128.00MB mem-reservation=0B
tuple-ids=0 row-size=29B cardinality=7300
====
# Test sort node.
select int_col from functional.alltypes
order by id * abs((factorial(5) / power(2, 4)))
---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
PLAN-ROOT SINK
+| mem-estimate=0B mem-reservation=0B
|
01:SORT
| order by: id * 7.5 ASC
-| hosts=3 per-host-mem=unavailable
+| mem-estimate=8.00MB mem-reservation=24.00MB
| tuple-ids=1 row-size=8B cardinality=7300
|
00:SCAN HDFS [functional.alltypes]
partitions=24/24 files=24 size=478.45KB
table stats: 7300 rows total
column stats: all
- hosts=3 per-host-mem=unavailable
+ mem-estimate=128.00MB mem-reservation=0B
tuple-ids=0 row-size=8B cardinality=7300
====
# Test HDFS table sink.
@@ -276,15 +296,16 @@ insert into functional.alltypes (id, int_col) partition(year,month)
select id, int_col, cast(1 + 1 + 1 + year as int), cast(month - (1 - 1 - 1) as int)
from functional.alltypessmall
---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(CAST(3 + year AS INT),CAST(month - -1 AS INT))]
| partitions=4
-| hosts=1 per-host-mem=unavailable
+| mem-estimate=1.56KB mem-reservation=0B
|
00:SCAN HDFS [functional.alltypessmall]
partitions=4/4 files=4 size=6.32KB
table stats: 100 rows total
column stats: all
- hosts=3 per-host-mem=unavailable
+ mem-estimate=32.00MB mem-reservation=0B
tuple-ids=0 row-size=16B cardinality=100
====
# Constant folding does not work across query blocks.
@@ -295,11 +316,13 @@ select sum(id + c3) from
) v2
) v3
---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
PLAN-ROOT SINK
+| mem-estimate=0B mem-reservation=0B
|
01:AGGREGATE [FINALIZE]
| output: sum(id + 10 + 20 + 30)
-| hosts=3 per-host-mem=unavailable
+| mem-estimate=10.00MB mem-reservation=0B
| tuple-ids=4 row-size=8B cardinality=1
|
00:SCAN HDFS [functional.alltypes]
@@ -307,6 +330,6 @@ PLAN-ROOT SINK
table stats: 7300 rows total
column stats: all
limit: 2
- hosts=3 per-host-mem=unavailable
+ mem-estimate=128.00MB mem-reservation=0B
tuple-ids=0 row-size=4B cardinality=2
====
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test
index b064d2b..bad3299 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test
@@ -1,100 +1,112 @@
select * from functional_kudu.zipcode_incomes where id = '8600000US00601'
---- PLAN
-F00:PLAN FRAGMENT [UNPARTITIONED]
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
PLAN-ROOT SINK
+ | mem-estimate=0B mem-reservation=0B
|
00:SCAN KUDU [functional_kudu.zipcode_incomes]
kudu predicates: id = '8600000US00601'
- hosts=3 per-host-mem=unavailable
+ mem-estimate=0B mem-reservation=0B
tuple-ids=0 row-size=124B cardinality=1
---- DISTRIBUTEDPLAN
-F01:PLAN FRAGMENT [UNPARTITIONED]
+F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
PLAN-ROOT SINK
+ | mem-estimate=0B mem-reservation=0B
|
01:EXCHANGE [UNPARTITIONED]
- hosts=3 per-host-mem=unavailable
+ mem-estimate=0B mem-reservation=0B
tuple-ids=0 row-size=124B cardinality=1
-F00:PLAN FRAGMENT [RANDOM]
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
DATASTREAM SINK [FRAGMENT=F01, EXCHANGE=01, UNPARTITIONED]
+ | mem-estimate=0B mem-reservation=0B
00:SCAN KUDU [functional_kudu.zipcode_incomes]
kudu predicates: id = '8600000US00601'
- hosts=3 per-host-mem=0B
+ mem-estimate=0B mem-reservation=0B
tuple-ids=0 row-size=124B cardinality=1
====
# The cardinality from "zip = '2'" should dominate.
select * from functional_kudu.zipcode_incomes where id != '1' and zip = '2'
---- PLAN
-F00:PLAN FRAGMENT [UNPARTITIONED]
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
PLAN-ROOT SINK
+ | mem-estimate=0B mem-reservation=0B
|
00:SCAN KUDU [functional_kudu.zipcode_incomes]
predicates: id != '1'
kudu predicates: zip = '2'
- hosts=3 per-host-mem=unavailable
+ mem-estimate=0B mem-reservation=0B
tuple-ids=0 row-size=124B cardinality=1
---- DISTRIBUTEDPLAN
-F01:PLAN FRAGMENT [UNPARTITIONED]
+F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
PLAN-ROOT SINK
+ | mem-estimate=0B mem-reservation=0B
|
01:EXCHANGE [UNPARTITIONED]
- hosts=3 per-host-mem=unavailable
+ mem-estimate=0B mem-reservation=0B
tuple-ids=0 row-size=124B cardinality=1
-F00:PLAN FRAGMENT [RANDOM]
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
DATASTREAM SINK [FRAGMENT=F01, EXCHANGE=01, UNPARTITIONED]
+ | mem-estimate=0B mem-reservation=0B
00:SCAN KUDU [functional_kudu.zipcode_incomes]
predicates: id != '1'
kudu predicates: zip = '2'
- hosts=3 per-host-mem=0B
+ mem-estimate=0B mem-reservation=0B
tuple-ids=0 row-size=124B cardinality=1
====
select * from functional_kudu.zipcode_incomes where id > '1' and zip > '2'
---- PLAN
-F00:PLAN FRAGMENT [UNPARTITIONED]
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
PLAN-ROOT SINK
+ | mem-estimate=0B mem-reservation=0B
|
00:SCAN KUDU [functional_kudu.zipcode_incomes]
kudu predicates: zip > '2', id > '1'
- hosts=3 per-host-mem=unavailable
+ mem-estimate=0B mem-reservation=0B
tuple-ids=0 row-size=124B cardinality=3317
---- DISTRIBUTEDPLAN
-F01:PLAN FRAGMENT [UNPARTITIONED]
+F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
PLAN-ROOT SINK
+ | mem-estimate=0B mem-reservation=0B
|
01:EXCHANGE [UNPARTITIONED]
- hosts=3 per-host-mem=unavailable
+ mem-estimate=0B mem-reservation=0B
tuple-ids=0 row-size=124B cardinality=3317
-F00:PLAN FRAGMENT [RANDOM]
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
DATASTREAM SINK [FRAGMENT=F01, EXCHANGE=01, UNPARTITIONED]
+ | mem-estimate=0B mem-reservation=0B
00:SCAN KUDU [functional_kudu.zipcode_incomes]
kudu predicates: zip > '2', id > '1'
- hosts=3 per-host-mem=0B
+ mem-estimate=0B mem-reservation=0B
tuple-ids=0 row-size=124B cardinality=3317
====
select * from functional_kudu.zipcode_incomes where id = '1' or id = '2'
---- PLAN
-F00:PLAN FRAGMENT [UNPARTITIONED]
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
PLAN-ROOT SINK
+ | mem-estimate=0B mem-reservation=0B
|
00:SCAN KUDU [functional_kudu.zipcode_incomes]
predicates: id = '1' OR id = '2'
- hosts=3 per-host-mem=unavailable
+ mem-estimate=0B mem-reservation=0B
tuple-ids=0 row-size=124B cardinality=2
---- DISTRIBUTEDPLAN
-F01:PLAN FRAGMENT [UNPARTITIONED]
+F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
PLAN-ROOT SINK
+ | mem-estimate=0B mem-reservation=0B
|
01:EXCHANGE [UNPARTITIONED]
- hosts=3 per-host-mem=unavailable
+ mem-estimate=0B mem-reservation=0B
tuple-ids=0 row-size=124B cardinality=2
-F00:PLAN FRAGMENT [RANDOM]
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
DATASTREAM SINK [FRAGMENT=F01, EXCHANGE=01, UNPARTITIONED]
+ | mem-estimate=0B mem-reservation=0B
00:SCAN KUDU [functional_kudu.zipcode_incomes]
predicates: id = '1' OR id = '2'
- hosts=3 per-host-mem=0B
+ mem-estimate=0B mem-reservation=0B
tuple-ids=0 row-size=124B cardinality=2
====
select * from functional_kudu.alltypes where
@@ -121,13 +133,14 @@ double_col in (cast('inf' as double)) and
string_col not in ("bar") and
id in (int_col)
---- PLAN
-F00:PLAN FRAGMENT [UNPARTITIONED]
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
PLAN-ROOT SINK
+ | mem-estimate=0B mem-reservation=0B
|
00:SCAN KUDU [functional_kudu.alltypes]
predicates: id IN (int_col), string_col NOT IN ('bar'), bigint_col IN (9999999999999999999), double_col IN (CAST('inf' AS DOUBLE)), float_col IN (CAST('NaN' AS FLOAT)), int_col IN (9999999999), smallint_col IN (99999, 2), tinyint_col IN (1, 999), bool_col IN (1)
kudu predicates: double_col IN (0.0), float_col IN (0.0), bigint_col IN (1, 2), int_col IN (1, 2), smallint_col IN (0, 2), string_col IN ('foo', 'foo '), tinyint_col IN (1, 2), bool_col IN (TRUE)
- hosts=3 per-host-mem=unavailable
+ mem-estimate=0B mem-reservation=0B
tuple-ids=0 row-size=126B cardinality=4
====
select * from functional_kudu.alltypes where
@@ -135,12 +148,13 @@ tinyint_col is not null and
smallint_col is null and
cast(date_string_col as tinyint) is null
---- PLAN
-F00:PLAN FRAGMENT [UNPARTITIONED]
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
PLAN-ROOT SINK
+ | mem-estimate=0B mem-reservation=0B
|
00:SCAN KUDU [functional_kudu.alltypes]
predicates: CAST(date_string_col AS TINYINT) IS NULL
kudu predicates: smallint_col IS NULL, tinyint_col IS NOT NULL
- hosts=3 per-host-mem=unavailable
+ mem-estimate=0B mem-reservation=0B
tuple-ids=0 row-size=126B cardinality=730
====
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
index 4d9544d..e3dd297 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
@@ -39,17 +39,19 @@ group by bigint_col
order by cnt, bigint_col
limit 10
---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
PLAN-ROOT SINK
+| mem-estimate=0B mem-reservation=0B
|
02:TOP-N [LIMIT=10]
| order by: count(int_col) ASC, bigint_col ASC
-| hosts=3 per-host-mem=unavailable
+| mem-estimate=160B mem-reservation=0B
| tuple-ids=2 row-size=16B cardinality=10
|
01:AGGREGATE [FINALIZE]
| output: count(int_col)
| group by: bigint_col
-| hosts=3 per-host-mem=unavailable
+| mem-estimate=128.00MB mem-reservation=264.00MB
| tuple-ids=1 row-size=16B cardinality=unavailable
|
00:SCAN HDFS [functional_parquet.alltypes]
@@ -59,36 +61,40 @@ PLAN-ROOT SINK
column stats: unavailable
parquet statistics predicates: id < 10
parquet dictionary predicates: id < 10
- hosts=3 per-host-mem=unavailable
+ mem-estimate=16.00MB mem-reservation=0B
tuple-ids=0 row-size=16B cardinality=unavailable
---- PARALLELPLANS
+F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
PLAN-ROOT SINK
+| mem-estimate=0B mem-reservation=0B
|
05:MERGING-EXCHANGE [UNPARTITIONED]
| order by: count(int_col) ASC, bigint_col ASC
| limit: 10
-| hosts=3 per-host-mem=unavailable
+| mem-estimate=0B mem-reservation=0B
| tuple-ids=2 row-size=16B cardinality=10
|
+F01:PLAN FRAGMENT [HASH(bigint_col)] hosts=3 instances=9
02:TOP-N [LIMIT=10]
| order by: count(int_col) ASC, bigint_col ASC
-| hosts=3 per-host-mem=160B
+| mem-estimate=160B mem-reservation=0B
| tuple-ids=2 row-size=16B cardinality=10
|
04:AGGREGATE [FINALIZE]
| output: count:merge(int_col)
| group by: bigint_col
-| hosts=3 per-host-mem=128.00MB
+| mem-estimate=128.00MB mem-reservation=264.00MB
| tuple-ids=1 row-size=16B cardinality=unavailable
|
03:EXCHANGE [HASH(bigint_col)]
-| hosts=3 per-host-mem=0B
+| mem-estimate=0B mem-reservation=0B
| tuple-ids=1 row-size=16B cardinality=unavailable
|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=9
01:AGGREGATE [STREAMING]
| output: count(int_col)
| group by: bigint_col
-| hosts=3 per-host-mem=128.00MB
+| mem-estimate=128.00MB mem-reservation=0B
| tuple-ids=1 row-size=16B cardinality=unavailable
|
00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
@@ -98,27 +104,29 @@ PLAN-ROOT SINK
column stats: unavailable
parquet statistics predicates: id < 10
parquet dictionary predicates: id < 10
- hosts=3 per-host-mem=16.00MB
+ mem-estimate=16.00MB mem-reservation=0B
tuple-ids=0 row-size=16B cardinality=unavailable
====
-# Single-table scan/filter/analysic should work.
+# Single-table scan/filter/analytic should work.
select row_number() over(partition by int_col order by id)
from functional_parquet.alltypes
where id < 10
---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
PLAN-ROOT SINK
+| mem-estimate=0B mem-reservation=0B
|
02:ANALYTIC
| functions: row_number()
| partition by: int_col
| order by: id ASC
| window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
-| hosts=3 per-host-mem=unavailable
+| mem-estimate=0B mem-reservation=16.00MB
| tuple-ids=4,3 row-size=16B cardinality=unavailable
|
01:SORT
| order by: int_col ASC NULLS FIRST, id ASC
-| hosts=3 per-host-mem=unavailable
+| mem-estimate=0B mem-reservation=24.00MB
| tuple-ids=4 row-size=8B cardinality=unavailable
|
00:SCAN HDFS [functional_parquet.alltypes]
@@ -128,32 +136,36 @@ PLAN-ROOT SINK
column stats: unavailable
parquet statistics predicates: id < 10
parquet dictionary predicates: id < 10
- hosts=3 per-host-mem=unavailable
+ mem-estimate=16.00MB mem-reservation=0B
tuple-ids=0 row-size=8B cardinality=unavailable
---- PARALLELPLANS
+F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
PLAN-ROOT SINK
+| mem-estimate=0B mem-reservation=0B
|
04:EXCHANGE [UNPARTITIONED]
-| hosts=3 per-host-mem=unavailable
+| mem-estimate=0B mem-reservation=0B
| tuple-ids=4,3 row-size=16B cardinality=unavailable
|
+F01:PLAN FRAGMENT [HASH(int_col)] hosts=3 instances=9
02:ANALYTIC
| functions: row_number()
| partition by: int_col
| order by: id ASC
| window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
-| hosts=3 per-host-mem=0B
+| mem-estimate=0B mem-reservation=16.00MB
| tuple-ids=4,3 row-size=16B cardinality=unavailable
|
01:SORT
| order by: int_col ASC NULLS FIRST, id ASC
-| hosts=3 per-host-mem=0B
+| mem-estimate=0B mem-reservation=24.00MB
| tuple-ids=4 row-size=8B cardinality=unavailable
|
03:EXCHANGE [HASH(int_col)]
-| hosts=3 per-host-mem=0B
+| mem-estimate=0B mem-reservation=0B
| tuple-ids=0 row-size=8B cardinality=unavailable
|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=9
00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
partitions=24/24 files=24 size=156.57KB
predicates: id < 10
@@ -161,7 +173,7 @@ PLAN-ROOT SINK
column stats: unavailable
parquet statistics predicates: id < 10
parquet dictionary predicates: id < 10
- hosts=3 per-host-mem=16.00MB
+ mem-estimate=16.00MB mem-reservation=0B
tuple-ids=0 row-size=8B cardinality=unavailable
====
# Nested-loop join in a subplan should work.
@@ -169,42 +181,44 @@ select *
from tpch_nested_parquet.customer c, c.c_orders o, o.o_lineitems
where c_custkey < 10 and o_orderkey < 5 and l_linenumber < 3
---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
PLAN-ROOT SINK
+| mem-estimate=0B mem-reservation=0B
|
01:SUBPLAN
-| hosts=3 per-host-mem=unavailable
+| mem-estimate=0B mem-reservation=0B
| tuple-ids=2,1,0 row-size=562B cardinality=1500000
|
|--08:NESTED LOOP JOIN [CROSS JOIN]
-| | hosts=3 per-host-mem=unavailable
+| | mem-estimate=254B mem-reservation=0B
| | tuple-ids=2,1,0 row-size=562B cardinality=100
| |
| |--02:SINGULAR ROW SRC
| | parent-subplan=01
-| | hosts=3 per-host-mem=unavailable
+| | mem-estimate=0B mem-reservation=0B
| | tuple-ids=0 row-size=254B cardinality=1
| |
| 04:SUBPLAN
-| | hosts=3 per-host-mem=unavailable
+| | mem-estimate=0B mem-reservation=0B
| | tuple-ids=2,1 row-size=308B cardinality=100
| |
| |--07:NESTED LOOP JOIN [CROSS JOIN]
-| | | hosts=3 per-host-mem=unavailable
+| | | mem-estimate=124B mem-reservation=0B
| | | tuple-ids=2,1 row-size=308B cardinality=10
| | |
| | |--05:SINGULAR ROW SRC
| | | parent-subplan=04
-| | | hosts=3 per-host-mem=unavailable
+| | | mem-estimate=0B mem-reservation=0B
| | | tuple-ids=1 row-size=124B cardinality=1
| | |
| | 06:UNNEST [o.o_lineitems]
| | parent-subplan=04
-| | hosts=3 per-host-mem=unavailable
+| | mem-estimate=0B mem-reservation=0B
| | tuple-ids=2 row-size=0B cardinality=10
| |
| 03:UNNEST [c.c_orders o]
| parent-subplan=01
-| hosts=3 per-host-mem=unavailable
+| mem-estimate=0B mem-reservation=0B
| tuple-ids=1 row-size=0B cardinality=10
|
00:SCAN HDFS [tpch_nested_parquet.customer c]
@@ -216,49 +230,52 @@ PLAN-ROOT SINK
columns missing stats: c_orders
parquet statistics predicates: c_custkey < 10
parquet dictionary predicates: c_custkey < 10
- hosts=3 per-host-mem=unavailable
+ mem-estimate=88.00MB mem-reservation=0B
tuple-ids=0 row-size=254B cardinality=15000
---- PARALLELPLANS
+F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
PLAN-ROOT SINK
+| mem-estimate=0B mem-reservation=0B
|
09:EXCHANGE [UNPARTITIONED]
-| hosts=3 per-host-mem=unavailable
+| mem-estimate=0B mem-reservation=0B
| tuple-ids=2,1,0 row-size=562B cardinality=1500000
|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=9
01:SUBPLAN
-| hosts=3 per-host-mem=0B
+| mem-estimate=0B mem-reservation=0B
| tuple-ids=2,1,0 row-size=562B cardinality=1500000
|
|--08:NESTED LOOP JOIN [CROSS JOIN]
-| | hosts=3 per-host-mem=254B
+| | mem-estimate=254B mem-reservation=0B
| | tuple-ids=2,1,0 row-size=562B cardinality=100
| |
| |--02:SINGULAR ROW SRC
| | parent-subplan=01
-| | hosts=3 per-host-mem=0B
+| | mem-estimate=0B mem-reservation=0B
| | tuple-ids=0 row-size=254B cardinality=1
| |
| 04:SUBPLAN
-| | hosts=3 per-host-mem=0B
+| | mem-estimate=0B mem-reservation=0B
| | tuple-ids=2,1 row-size=308B cardinality=100
| |
| |--07:NESTED LOOP JOIN [CROSS JOIN]
-| | | hosts=3 per-host-mem=124B
+| | | mem-estimate=124B mem-reservation=0B
| | | tuple-ids=2,1 row-size=308B cardinality=10
| | |
| | |--05:SINGULAR ROW SRC
| | | parent-subplan=04
-| | | hosts=3 per-host-mem=0B
+| | | mem-estimate=0B mem-reservation=0B
| | | tuple-ids=1 row-size=124B cardinality=1
| | |
| | 06:UNNEST [o.o_lineitems]
| | parent-subplan=04
-| | hosts=3 per-host-mem=0B
+| | mem-estimate=0B mem-reservation=0B
| | tuple-ids=2 row-size=0B cardinality=10
| |
| 03:UNNEST [c.c_orders o]
| parent-subplan=01
-| hosts=3 per-host-mem=0B
+| mem-estimate=0B mem-reservation=0B
| tuple-ids=1 row-size=0B cardinality=10
|
00:SCAN HDFS [tpch_nested_parquet.customer c, RANDOM]
@@ -270,7 +287,7 @@ PLAN-ROOT SINK
columns missing stats: c_orders
parquet statistics predicates: c_custkey < 10
parquet dictionary predicates: c_custkey < 10
- hosts=3 per-host-mem=88.00MB
+ mem-estimate=88.00MB mem-reservation=0B
tuple-ids=0 row-size=254B cardinality=15000
====
# Hash-join in a subplan should work.
@@ -278,34 +295,36 @@ select c.*
from tpch_nested_parquet.customer c, c.c_orders o1, c.c_orders o2
where o1.o_orderkey = o2.o_orderkey + 2 and o1.o_orderkey < 5
---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
PLAN-ROOT SINK
+| mem-estimate=0B mem-reservation=0B
|
01:SUBPLAN
-| hosts=3 per-host-mem=unavailable
+| mem-estimate=0B mem-reservation=0B
| tuple-ids=1,0,2 row-size=286B cardinality=1500000
|
|--06:HASH JOIN [INNER JOIN]
| | hash predicates: o1.o_orderkey = o2.o_orderkey + 2
-| | hosts=3 per-host-mem=unavailable
+| | mem-estimate=0B mem-reservation=136.00MB
| | tuple-ids=1,0,2 row-size=286B cardinality=10
| |
| |--04:UNNEST [c.c_orders o2]
| | parent-subplan=01
-| | hosts=3 per-host-mem=unavailable
+| | mem-estimate=0B mem-reservation=0B
| | tuple-ids=2 row-size=0B cardinality=10
| |
| 05:NESTED LOOP JOIN [CROSS JOIN]
-| | hosts=3 per-host-mem=unavailable
+| | mem-estimate=270B mem-reservation=0B
| | tuple-ids=1,0 row-size=278B cardinality=10
| |
| |--02:SINGULAR ROW SRC
| | parent-subplan=01
-| | hosts=3 per-host-mem=unavailable
+| | mem-estimate=0B mem-reservation=0B
| | tuple-ids=0 row-size=270B cardinality=1
| |
| 03:UNNEST [c.c_orders o1]
| parent-subplan=01
-| hosts=3 per-host-mem=unavailable
+| mem-estimate=0B mem-reservation=0B
| tuple-ids=1 row-size=0B cardinality=10
|
00:SCAN HDFS [tpch_nested_parquet.customer c]
@@ -314,41 +333,44 @@ PLAN-ROOT SINK
predicates on o1: o1.o_orderkey < 5
table stats: 150000 rows total
columns missing stats: c_orders, c_orders
- hosts=3 per-host-mem=unavailable
+ mem-estimate=88.00MB mem-reservation=0B
tuple-ids=0 row-size=270B cardinality=150000
---- PARALLELPLANS
+F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
PLAN-ROOT SINK
+| mem-estimate=0B mem-reservation=0B
|
07:EXCHANGE [UNPARTITIONED]
-| hosts=3 per-host-mem=unavailable
+| mem-estimate=0B mem-reservation=0B
| tuple-ids=1,0,2 row-size=286B cardinality=1500000
|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=9
01:SUBPLAN
-| hosts=3 per-host-mem=0B
+| mem-estimate=0B mem-reservation=0B
| tuple-ids=1,0,2 row-size=286B cardinality=1500000
|
|--06:HASH JOIN [INNER JOIN]
| | hash predicates: o1.o_orderkey = o2.o_orderkey + 2
-| | hosts=3 per-host-mem=0B
+| | mem-estimate=0B mem-reservation=136.00MB
| | tuple-ids=1,0,2 row-size=286B cardinality=10
| |
| |--04:UNNEST [c.c_orders o2]
| | parent-subplan=01
-| | hosts=3 per-host-mem=0B
+| | mem-estimate=0B mem-reservation=0B
| | tuple-ids=2 row-size=0B cardinality=10
| |
| 05:NESTED LOOP JOIN [CROSS JOIN]
-| | hosts=3 per-host-mem=270B
+| | mem-estimate=270B mem-reservation=0B
| | tuple-ids=1,0 row-size=278B cardinality=10
| |
| |--02:SINGULAR ROW SRC
| | parent-subplan=01
-| | hosts=3 per-host-mem=0B
+| | mem-estimate=0B mem-reservation=0B
| | tuple-ids=0 row-size=270B cardinality=1
| |
| 03:UNNEST [c.c_orders o1]
| parent-subplan=01
-| hosts=3 per-host-mem=0B
+| mem-estimate=0B mem-reservation=0B
| tuple-ids=1 row-size=0B cardinality=10
|
00:SCAN HDFS [tpch_nested_parquet.customer c, RANDOM]
@@ -357,6 +379,6 @@ PLAN-ROOT SINK
predicates on o1: o1.o_orderkey < 5
table stats: 150000 rows total
columns missing stats: c_orders, c_orders
- hosts=3 per-host-mem=88.00MB
+ mem-estimate=88.00MB mem-reservation=0B
tuple-ids=0 row-size=270B cardinality=150000
====
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9a29dfc9/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
index 4712b96..df5f99d 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
@@ -7,21 +7,23 @@ select count(*) from functional_parquet.alltypes
where int_col > 1 and int_col * rand() > 50 and int_col is null
and int_col > tinyint_col;
---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
PLAN-ROOT SINK
+| mem-estimate=0B mem-reservation=0B
|
01:AGGREGATE [FINALIZE]
| output: count(*)
-| hosts=3 per-host-mem=unavailable
+| mem-estimate=10.00MB mem-reservation=0B
| tuple-ids=1 row-size=8B cardinality=1
|
00:SCAN HDFS [functional_parquet.alltypes]
- partitions=24/24 files=24 size=165.17KB
+ partitions=24/24 files=24 size=156.57KB
predicates: int_col IS NULL, int_col > 1, int_col > tinyint_col, int_col * rand() > 50
table stats: unavailable
column stats: unavailable
parquet statistics predicates: int_col > 1
parquet dictionary predicates: int_col > 1
- hosts=3 per-host-mem=unavailable
+ mem-estimate=32.00MB mem-reservation=0B
tuple-ids=0 row-size=5B cardinality=unavailable
====
# Test a variety of types
@@ -32,20 +34,22 @@ and double_col > 100.00 and date_string_col > '1993-10-01' and string_col > 'aaa
and timestamp_cmp(timestamp_col, '2016-11-20 00:00:00') = 1
and year > 2000 and month < 12;
---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
PLAN-ROOT SINK
+| mem-estimate=0B mem-reservation=0B
|
01:AGGREGATE [FINALIZE]
| output: count(*)
-| hosts=3 per-host-mem=unavailable
+| mem-estimate=10.00MB mem-reservation=0B
| tuple-ids=1 row-size=8B cardinality=1
|
00:SCAN HDFS [functional_parquet.alltypes]
- partitions=22/24 files=22 size=151.24KB
+ partitions=22/24 files=22 size=143.36KB
predicates: bool_col, bigint_col < 5000, double_col > 100.00, float_col > 50.00, id = 1, smallint_col > 50, tinyint_col < 50, string_col > 'aaaa', mod(int_col, 2) = 1, timestamp_cmp(timestamp_col, TIMESTAMP '2016-11-20 00:00:00') = 1, date_string_col > '1993-10-01'
table stats: unavailable
columns missing stats: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col
parquet statistics predicates: bigint_col < 5000, double_col > 100.00, float_col > 50.00, id = 1, smallint_col > 50, tinyint_col < 50, string_col > 'aaaa', date_string_col > '1993-10-01'
parquet dictionary predicates: bool_col, bigint_col < 5000, double_col > 100.00, float_col > 50.00, id = 1, smallint_col > 50, tinyint_col < 50, string_col > 'aaaa', mod(int_col, 2) = 1, timestamp_cmp(timestamp_col, TIMESTAMP '2016-11-20 00:00:00') = 1, date_string_col > '1993-10-01'
- hosts=3 per-host-mem=unavailable
+ mem-estimate=128.00MB mem-reservation=0B
tuple-ids=0 row-size=80B cardinality=unavailable
====