You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by he...@apache.org on 2016/10/18 06:00:50 UTC
[2/7] incubator-impala git commit: IMPALA-4270: Gracefully fail
unsupported queries with mt_dop > 0.
IMPALA-4270: Gracefully fail unsupported queries with mt_dop > 0.
MT_DOP > 0 is only supported for plans without distributed joins
or table sinks. Adds validation to fail unsupported queries
gracefully in planning.
For scans in queries that are executable with MT_DOP > 0 we either
use the optimized MT scan node BE implementation (only Parquet), or
we use the conventional scan node with num_scanner_threads=1.
TODO: Still need to add end-to-end tests.
Change-Id: I91a60ea7b6e3ae4ee44be856615ddd3cd0af476d
Reviewed-on: http://gerrit.cloudera.org:8080/4677
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Internal Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/04802535
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/04802535
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/04802535
Branch: refs/heads/master
Commit: 04802535661979c50e5d06ef04e62eee677b901e
Parents: b0e87c6
Author: Alex Behm <al...@cloudera.com>
Authored: Mon Oct 10 11:03:43 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Mon Oct 17 09:22:57 2016 +0000
----------------------------------------------------------------------
be/src/exec/exec-node.cc | 5 +-
common/thrift/PlanNodes.thrift | 5 +
.../org/apache/impala/analysis/Analyzer.java | 10 +-
.../org/apache/impala/planner/HdfsScanNode.java | 28 +-
.../java/org/apache/impala/planner/Planner.java | 13 +-
.../apache/impala/planner/PlannerContext.java | 10 +-
.../impala/planner/SingleNodePlanner.java | 24 +-
.../org/apache/impala/planner/PlannerTest.java | 21 +-
.../apache/impala/planner/PlannerTestBase.java | 18 +-
.../queries/PlannerTest/mt-dop-validation.test | 350 +++++++++++++++++++
10 files changed, 450 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/04802535/be/src/exec/exec-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index 837fc09..df491dd 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -264,9 +264,12 @@ Status ExecNode::CreateNode(ObjectPool* pool, const TPlanNode& tnode,
switch (tnode.node_type) {
case TPlanNodeType::HDFS_SCAN_NODE:
*node = pool->Add(new HdfsScanNode(pool, tnode, descs));
- if (state->query_options().mt_dop > 0) {
+ if (tnode.hdfs_scan_node.use_mt_scan_node) {
+ DCHECK_GT(state->query_options().mt_dop, 0);
*node = pool->Add(new HdfsScanNodeMt(pool, tnode, descs));
} else {
+ DCHECK(state->query_options().mt_dop == 0
+ || state->query_options().num_scanner_threads == 1);
*node = pool->Add(new HdfsScanNode(pool, tnode, descs));
}
// If true, this node requests codegen over interpretation for conjuncts
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/04802535/common/thrift/PlanNodes.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index 4cf1357..49fcfbb 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -202,6 +202,11 @@ struct THdfsScanNode {
// Number of header lines to skip at the beginning of each file of this table. Only set
// for hdfs text files.
6: optional i32 skip_header_line_count
+
+ // Indicates whether the MT scan node implementation should be used.
+ // If this is true then the MT_DOP query option must be > 0.
+ // TODO: Remove this option when the MT scan node supports all file formats.
+ 7: optional bool use_mt_scan_node
}
struct TDataSourceScanNode {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/04802535/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index 3edddf2..f9909b1 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -30,9 +30,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.impala.analysis.Path.PathType;
import org.apache.impala.authorization.AuthorizationConfig;
import org.apache.impala.authorization.Privilege;
@@ -66,10 +63,14 @@ import org.apache.impala.thrift.TCatalogObjectType;
import org.apache.impala.thrift.TLineageGraph;
import org.apache.impala.thrift.TNetworkAddress;
import org.apache.impala.thrift.TQueryCtx;
+import org.apache.impala.thrift.TQueryOptions;
import org.apache.impala.util.DisjointSet;
import org.apache.impala.util.EventSequence;
import org.apache.impala.util.ListMap;
import org.apache.impala.util.TSessionStateUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
@@ -2246,6 +2247,9 @@ public class Analyzer {
public String getDefaultDb() { return globalState_.queryCtx.session.database; }
public User getUser() { return user_; }
public TQueryCtx getQueryCtx() { return globalState_.queryCtx; }
+ public TQueryOptions getQueryOptions() {
+ return globalState_.queryCtx.getRequest().getQuery_options();
+ }
public AuthorizationConfig getAuthzConfig() { return globalState_.authzConfig; }
public ListMap<TNetworkAddress> getHostIndex() { return globalState_.hostIndex; }
public ColumnLineageGraph getColumnLineageGraph() { return globalState_.lineageGraph; }
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/04802535/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 4052867..3d52aa4 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -21,9 +21,7 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.Set;
import org.apache.impala.analysis.Analyzer;
import org.apache.impala.analysis.Expr;
@@ -55,6 +53,9 @@ import org.apache.impala.thrift.TScanRange;
import org.apache.impala.thrift.TScanRangeLocation;
import org.apache.impala.thrift.TScanRangeLocations;
import org.apache.impala.util.MembershipSnapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.google.common.base.Joiner;
import com.google.common.base.Objects;
import com.google.common.base.Objects.ToStringHelper;
@@ -107,6 +108,9 @@ public class HdfsScanNode extends ScanNode {
// True if this scan node should use codegen for evaluting conjuncts.
private boolean codegenConjuncts_;
+ // True if this scan node should use the MT implementation in the backend.
+ private boolean useMtScanNode_;
+
// Conjuncts that can be evaluated while materializing the items (tuples) of
// collection-typed slots. Maps from tuple descriptor to the conjuncts bound by that
// tuple. Uses a linked hash map for consistent display in explain.
@@ -168,7 +172,16 @@ public class HdfsScanNode extends ScanNode {
computeMemLayout(analyzer);
// compute scan range locations
- computeScanRangeLocations(analyzer);
+ Set<HdfsFileFormat> fileFormats = computeScanRangeLocations(analyzer);
+
+ // Determine backend scan node implementation to use. The optimized MT implementation
+ // is currently only supported for Parquet.
+ if (analyzer.getQueryOptions().mt_dop > 0 &&
+ fileFormats.size() == 1 && fileFormats.contains(HdfsFileFormat.PARQUET)) {
+ useMtScanNode_ = true;
+ } else {
+ useMtScanNode_ = false;
+ }
// do this at the end so it can take all conjuncts and scan ranges into account
computeStats(analyzer);
@@ -298,12 +311,15 @@ public class HdfsScanNode extends ScanNode {
/**
* Computes scan ranges (hdfs splits) plus their storage locations, including volume
* ids, based on the given maximum number of bytes each scan range should scan.
+ * Returns the set of file formats being scanned.
*/
- private void computeScanRangeLocations(Analyzer analyzer) {
+ private Set<HdfsFileFormat> computeScanRangeLocations(Analyzer analyzer) {
long maxScanRangeLength = analyzer.getQueryCtx().getRequest().getQuery_options()
.getMax_scan_range_length();
scanRanges_ = Lists.newArrayList();
+ Set<HdfsFileFormat> fileFormats = Sets.newHashSet();
for (HdfsPartition partition: partitions_) {
+ fileFormats.add(partition.getFileFormat());
Preconditions.checkState(partition.getId() >= 0);
for (HdfsPartition.FileDescriptor fileDesc: partition.getFileDescriptors()) {
for (THdfsFileBlock thriftBlock: fileDesc.getFileBlocks()) {
@@ -353,6 +369,7 @@ public class HdfsScanNode extends ScanNode {
}
}
}
+ return fileFormats;
}
/**
@@ -542,6 +559,7 @@ public class HdfsScanNode extends ScanNode {
if (skipHeaderLineCount_ > 0) {
msg.hdfs_scan_node.setSkip_header_line_count(skipHeaderLineCount_);
}
+ msg.hdfs_scan_node.setUse_mt_scan_node(useMtScanNode_);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/04802535/fe/src/main/java/org/apache/impala/planner/Planner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index ed4c677..8abb901 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -21,9 +21,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.impala.analysis.AnalysisContext;
import org.apache.impala.analysis.Analyzer;
import org.apache.impala.analysis.ColumnLineageGraph;
@@ -43,6 +40,9 @@ import org.apache.impala.thrift.TQueryExecRequest;
import org.apache.impala.thrift.TRuntimeFilterMode;
import org.apache.impala.thrift.TTableName;
import org.apache.impala.util.MaxRowsProcessedVisitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -117,12 +117,13 @@ public class Planner {
"Runtime filters computed");
}
+ singleNodePlanner.validatePlan(singleNodePlan);
+
if (ctx_.isSingleNodeExec()) {
// create one fragment containing the entire single-node plan tree
fragments = Lists.newArrayList(new PlanFragment(
ctx_.getNextFragmentId(), singleNodePlan, DataPartition.UNPARTITIONED));
} else {
- singleNodePlanner.validatePlan(singleNodePlan);
// create distributed plan
fragments = distributedPlanner.createPlanFragments(singleNodePlan);
}
@@ -200,10 +201,14 @@ public class Planner {
* TODO: roll into createPlan()
*/
public List<PlanFragment> createParallelPlans() throws ImpalaException {
+ Preconditions.checkState(ctx_.getQueryOptions().mt_dop > 0);
ArrayList<PlanFragment> distrPlan = createPlan();
Preconditions.checkNotNull(distrPlan);
ParallelPlanner planner = new ParallelPlanner(ctx_);
List<PlanFragment> parallelPlans = planner.createPlans(distrPlan.get(0));
+ // Only use one scanner thread per scan-node instance since intra-node
+ // parallelism is achieved via multiple fragment instances.
+ ctx_.getQueryOptions().setNum_scanner_threads(1);
ctx_.getRootAnalyzer().getTimeline().markEvent("Parallel plans created");
return parallelPlans;
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/04802535/fe/src/main/java/org/apache/impala/planner/PlannerContext.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/PlannerContext.java b/fe/src/main/java/org/apache/impala/planner/PlannerContext.java
index 3275a7a..721acf9 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlannerContext.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlannerContext.java
@@ -25,6 +25,7 @@ import org.apache.impala.analysis.QueryStmt;
import org.apache.impala.common.IdGenerator;
import org.apache.impala.thrift.TQueryCtx;
import org.apache.impala.thrift.TQueryOptions;
+
import com.google.common.collect.Lists;
/**
@@ -79,9 +80,7 @@ public class PlannerContext {
public QueryStmt getQueryStmt() { return queryStmt_; }
public TQueryCtx getQueryCtx() { return queryCtx_; }
- public TQueryOptions getQueryOptions() {
- return queryCtx_.getRequest().getQuery_options();
- }
+ public TQueryOptions getQueryOptions() { return getRootAnalyzer().getQueryOptions(); }
public AnalysisContext.AnalysisResult getAnalysisResult() { return analysisResult_; }
public Analyzer getRootAnalyzer() { return analysisResult_.getAnalyzer(); }
public boolean isSingleNodeExec() { return getQueryOptions().num_nodes == 1; }
@@ -91,7 +90,10 @@ public class PlannerContext {
return analysisResult_.isInsertStmt() || analysisResult_.isCreateTableAsSelectStmt();
}
public boolean isQuery() { return analysisResult_.isQueryStmt(); }
-
+ public boolean hasTableSink() {
+ return isInsertOrCtas() || analysisResult_.isUpdateStmt()
+ || analysisResult_.isDeleteStmt();
+ }
public boolean hasSubplan() { return !subplans_.isEmpty(); }
public SubplanNode getSubplan() { return subplans_.getFirst(); }
public boolean pushSubplan(SubplanNode n) { return subplans_.offerFirst(n); }
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/04802535/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
index b686fe6..434e36d 100644
--- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
@@ -27,9 +27,6 @@ import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.impala.analysis.AggregateInfo;
import org.apache.impala.analysis.AnalyticInfo;
import org.apache.impala.analysis.Analyzer;
@@ -67,6 +64,10 @@ import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.InternalException;
import org.apache.impala.common.NotImplementedException;
import org.apache.impala.common.Pair;
+import org.apache.impala.common.RuntimeEnv;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
@@ -148,11 +149,22 @@ public class SingleNodePlanner {
}
/**
- * Validates a single-node plan by checking that it does not contain right or
- * full outer joins with no equi-join conjuncts that are not inside the right child
- * of a SubplanNode. Throws a NotImplementedException if plan validation fails.
+ * Checks that the given single-node plan is executable:
+ * - It may not contain right or full outer joins with no equi-join conjuncts that
+ * are not inside the right child of a SubplanNode.
+ * - MT_DOP > 0 is not supported for plans with base table joins or table sinks.
+ * Throws a NotImplementedException if plan validation fails.
*/
public void validatePlan(PlanNode planNode) throws NotImplementedException {
+ if (ctx_.getQueryOptions().mt_dop > 0 && !RuntimeEnv.INSTANCE.isTestEnv()
+ && (planNode instanceof JoinNode || ctx_.hasTableSink())) {
+ throw new NotImplementedException(
+ "MT_DOP not supported for plans with base table joins or table sinks.");
+ }
+
+ // As long as MT_DOP == 0 any join can run in a single-node plan.
+ if (ctx_.isSingleNodeExec() && ctx_.getQueryOptions().mt_dop == 0) return;
+
if (planNode instanceof NestedLoopJoinNode) {
JoinNode joinNode = (JoinNode) planNode;
JoinOperator joinOp = joinNode.getJoinOp();
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/04802535/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 88a8631..6250969 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -17,14 +17,13 @@
package org.apache.impala.planner;
-import org.junit.Assume;
-import org.junit.Test;
-
import org.apache.impala.catalog.Db;
import org.apache.impala.common.RuntimeEnv;
import org.apache.impala.thrift.TExplainLevel;
import org.apache.impala.thrift.TQueryOptions;
import org.apache.impala.thrift.TRuntimeFilterMode;
+import org.junit.Assume;
+import org.junit.Test;
// All planner tests, except for S3 specific tests should go here.
public class PlannerTest extends PlannerTestBase {
@@ -279,4 +278,20 @@ public class PlannerTest extends PlannerTestBase {
Assume.assumeTrue(RuntimeEnv.INSTANCE.isKuduSupported());
runPlannerTestFile("tpch-kudu");
}
+
+ @Test
+ public void testMtDopValidation() {
+ // Tests that queries supported with mt_dop > 0 produce a parallel plan, or
+ // throw a NotImplementedException otherwise (e.g. plan has a distributed join).
+ TQueryOptions options = defaultQueryOptions();
+ options.setMt_dop(3);
+ try {
+ // Temporarily unset the test env such that unsupported queries with mt_dop > 0
+ // throw an exception. Those are otherwise allowed for testing parallel plans.
+ RuntimeEnv.INSTANCE.setTestEnv(false);
+ runPlannerTestFile("mt-dop-validation", options);
+ } finally {
+ RuntimeEnv.INSTANCE.setTestEnv(true);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/04802535/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
index 284d7e5..9c12b89 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
@@ -33,13 +33,6 @@ import java.util.regex.Pattern;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.fs.Path;
-import org.apache.kudu.client.KuduClient;
-import org.apache.kudu.client.KuduScanToken;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.impala.analysis.ColumnLineageGraph;
import org.apache.impala.catalog.CatalogException;
import org.apache.impala.common.FrontendTestBase;
@@ -72,6 +65,13 @@ import org.apache.impala.thrift.TTableDescriptor;
import org.apache.impala.thrift.TTupleDescriptor;
import org.apache.impala.thrift.TUpdateMembershipRequest;
import org.apache.impala.util.MembershipSnapshot;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduScanToken;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -503,6 +503,7 @@ public class PlannerTestBase extends FrontendTestBase {
// Query exec request may not be set for DDL, e.g., CTAS.
String locationsStr = null;
if (execRequest != null && execRequest.isSetQuery_exec_request()) {
+ if (execRequest.query_exec_request.fragments == null) return;
buildMaps(execRequest.query_exec_request);
// If we optimize the partition key scans, we may get all the partition key values
// from the metadata and don't reference any table. Skip the check in this case.
@@ -563,7 +564,8 @@ public class PlannerTestBase extends FrontendTestBase {
String query, TExecRequest execRequest, StringBuilder errorLog) {
if (execRequest == null) return;
if (!execRequest.isSetQuery_exec_request()
- || execRequest.query_exec_request == null) {
+ || execRequest.query_exec_request == null
+ || execRequest.query_exec_request.fragments == null) {
return;
}
for (TPlanFragment planFragment : execRequest.query_exec_request.fragments) {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/04802535/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
new file mode 100644
index 0000000..fe25599
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
@@ -0,0 +1,350 @@
+# Distributed nested-loop join not allowed.
+select count(*) from
+functional_parquet.alltypestiny a,
+functional_parquet.alltypestiny b
+---- PLAN
+not implemented: MT_DOP not supported for plans with base table joins or table sinks.
+---- PARALLELPLANS
+not implemented: MT_DOP not supported for plans with base table joins or table sinks.
+====
+# Distributed hash-join not allowed.
+select count(*) from
+functional_parquet.alltypestiny a,
+functional_parquet.alltypestiny b
+where a.id = b.id
+---- PLAN
+not implemented: MT_DOP not supported for plans with base table joins or table sinks.
+---- PARALLELPLANS
+not implemented: MT_DOP not supported for plans with base table joins or table sinks.
+====
+# Insert not allowed.
+insert into functional_parquet.alltypes partition(year,month)
+select * from functional_parquet.alltypessmall
+---- PLAN
+not implemented: MT_DOP not supported for plans with base table joins or table sinks.
+---- PARALLELPLANS
+not implemented: MT_DOP not supported for plans with base table joins or table sinks.
+====
+# CTAS not allowed.
+create table ctas_mt_dop_test as select * from functional_parquet.alltypes
+---- PLAN
+not implemented: MT_DOP not supported for plans with base table joins or table sinks.
+---- PARALLELPLANS
+not implemented: MT_DOP not supported for plans with base table joins or table sinks.
+====
+# Single-table scan/filter/agg/topn should work.
+select count(int_col) cnt from functional_parquet.alltypes
+where id < 10
+group by bigint_col
+order by cnt, bigint_col
+limit 10
+---- PLAN
+PLAN-ROOT SINK
+|
+02:TOP-N [LIMIT=10]
+| order by: count(int_col) ASC, bigint_col ASC
+| hosts=3 per-host-mem=unavailable
+| tuple-ids=2 row-size=16B cardinality=10
+|
+01:AGGREGATE [FINALIZE]
+| output: count(int_col)
+| group by: bigint_col
+| hosts=3 per-host-mem=unavailable
+| tuple-ids=1 row-size=16B cardinality=unavailable
+|
+00:SCAN HDFS [functional_parquet.alltypes]
+ partitions=24/24 files=24 size=156.57KB
+ predicates: id < 10
+ table stats: unavailable
+ column stats: unavailable
+ hosts=3 per-host-mem=unavailable
+ tuple-ids=0 row-size=16B cardinality=unavailable
+---- PARALLELPLANS
+PLAN-ROOT SINK
+|
+05:MERGING-EXCHANGE [UNPARTITIONED]
+| order by: count(int_col) ASC, bigint_col ASC
+| limit: 10
+| hosts=3 per-host-mem=unavailable
+| tuple-ids=2 row-size=16B cardinality=10
+|
+02:TOP-N [LIMIT=10]
+| order by: count(int_col) ASC, bigint_col ASC
+| hosts=3 per-host-mem=unavailable
+| tuple-ids=2 row-size=16B cardinality=10
+|
+04:AGGREGATE [FINALIZE]
+| output: count:merge(int_col)
+| group by: bigint_col
+| hosts=3 per-host-mem=unavailable
+| tuple-ids=1 row-size=16B cardinality=unavailable
+|
+03:EXCHANGE [HASH(bigint_col)]
+| hosts=3 per-host-mem=unavailable
+| tuple-ids=1 row-size=16B cardinality=unavailable
+|
+01:AGGREGATE [STREAMING]
+| output: count(int_col)
+| group by: bigint_col
+| hosts=3 per-host-mem=unavailable
+| tuple-ids=1 row-size=16B cardinality=unavailable
+|
+00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
+ partitions=24/24 files=24 size=156.57KB
+ predicates: id < 10
+ table stats: unavailable
+ column stats: unavailable
+ hosts=3 per-host-mem=unavailable
+ tuple-ids=0 row-size=16B cardinality=unavailable
+====
+# Single-table scan/filter/analysic should work.
+select row_number() over(partition by int_col order by id)
+from functional_parquet.alltypes
+where id < 10
+---- PLAN
+PLAN-ROOT SINK
+|
+02:ANALYTIC
+| functions: row_number()
+| partition by: int_col
+| order by: id ASC
+| window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+| hosts=3 per-host-mem=unavailable
+| tuple-ids=3,2 row-size=16B cardinality=unavailable
+|
+01:SORT
+| order by: int_col ASC NULLS FIRST, id ASC
+| hosts=3 per-host-mem=unavailable
+| tuple-ids=3 row-size=8B cardinality=unavailable
+|
+00:SCAN HDFS [functional_parquet.alltypes]
+ partitions=24/24 files=24 size=156.57KB
+ predicates: id < 10
+ table stats: unavailable
+ column stats: unavailable
+ hosts=3 per-host-mem=unavailable
+ tuple-ids=0 row-size=8B cardinality=unavailable
+---- PARALLELPLANS
+PLAN-ROOT SINK
+|
+04:EXCHANGE [UNPARTITIONED]
+| hosts=3 per-host-mem=unavailable
+| tuple-ids=3,2 row-size=16B cardinality=unavailable
+|
+02:ANALYTIC
+| functions: row_number()
+| partition by: int_col
+| order by: id ASC
+| window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+| hosts=3 per-host-mem=unavailable
+| tuple-ids=3,2 row-size=16B cardinality=unavailable
+|
+01:SORT
+| order by: int_col ASC NULLS FIRST, id ASC
+| hosts=3 per-host-mem=unavailable
+| tuple-ids=3 row-size=8B cardinality=unavailable
+|
+03:EXCHANGE [HASH(int_col)]
+| hosts=3 per-host-mem=unavailable
+| tuple-ids=0 row-size=8B cardinality=unavailable
+|
+00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
+ partitions=24/24 files=24 size=156.57KB
+ predicates: id < 10
+ table stats: unavailable
+ column stats: unavailable
+ hosts=3 per-host-mem=unavailable
+ tuple-ids=0 row-size=8B cardinality=unavailable
+====
+# Nested-loop join in a subplan should work.
+select *
+from tpch_nested_parquet.customer c, c.c_orders o, o.o_lineitems
+where c_custkey < 10 and o_orderkey < 5 and l_linenumber < 3
+---- PLAN
+PLAN-ROOT SINK
+|
+01:SUBPLAN
+| hosts=3 per-host-mem=unavailable
+| tuple-ids=2,1,0 row-size=562B cardinality=1500000
+|
+|--08:NESTED LOOP JOIN [CROSS JOIN]
+| | hosts=3 per-host-mem=unavailable
+| | tuple-ids=2,1,0 row-size=562B cardinality=100
+| |
+| |--02:SINGULAR ROW SRC
+| | parent-subplan=01
+| | hosts=3 per-host-mem=unavailable
+| | tuple-ids=0 row-size=254B cardinality=1
+| |
+| 04:SUBPLAN
+| | hosts=3 per-host-mem=unavailable
+| | tuple-ids=2,1 row-size=308B cardinality=100
+| |
+| |--07:NESTED LOOP JOIN [CROSS JOIN]
+| | | hosts=3 per-host-mem=unavailable
+| | | tuple-ids=2,1 row-size=308B cardinality=10
+| | |
+| | |--05:SINGULAR ROW SRC
+| | | parent-subplan=04
+| | | hosts=3 per-host-mem=unavailable
+| | | tuple-ids=1 row-size=124B cardinality=1
+| | |
+| | 06:UNNEST [o.o_lineitems]
+| | parent-subplan=04
+| | hosts=3 per-host-mem=unavailable
+| | tuple-ids=2 row-size=0B cardinality=10
+| |
+| 03:UNNEST [c.c_orders o]
+| parent-subplan=01
+| hosts=3 per-host-mem=unavailable
+| tuple-ids=1 row-size=0B cardinality=10
+|
+00:SCAN HDFS [tpch_nested_parquet.customer c]
+ partitions=1/1 files=4 size=292.36MB
+ predicates: c_custkey < 10, !empty(c.c_orders)
+ predicates on o: !empty(o.o_lineitems), o_orderkey < 5
+ predicates on o_lineitems: l_linenumber < 3
+ table stats: 150000 rows total
+ columns missing stats: c_orders
+ hosts=3 per-host-mem=unavailable
+ tuple-ids=0 row-size=254B cardinality=15000
+---- PARALLELPLANS
+PLAN-ROOT SINK
+|
+09:EXCHANGE [UNPARTITIONED]
+| hosts=3 per-host-mem=unavailable
+| tuple-ids=2,1,0 row-size=562B cardinality=1500000
+|
+01:SUBPLAN
+| hosts=3 per-host-mem=unavailable
+| tuple-ids=2,1,0 row-size=562B cardinality=1500000
+|
+|--08:NESTED LOOP JOIN [CROSS JOIN]
+| | hosts=3 per-host-mem=unavailable
+| | tuple-ids=2,1,0 row-size=562B cardinality=100
+| |
+| |--02:SINGULAR ROW SRC
+| | parent-subplan=01
+| | hosts=3 per-host-mem=unavailable
+| | tuple-ids=0 row-size=254B cardinality=1
+| |
+| 04:SUBPLAN
+| | hosts=3 per-host-mem=unavailable
+| | tuple-ids=2,1 row-size=308B cardinality=100
+| |
+| |--07:NESTED LOOP JOIN [CROSS JOIN]
+| | | hosts=3 per-host-mem=unavailable
+| | | tuple-ids=2,1 row-size=308B cardinality=10
+| | |
+| | |--05:SINGULAR ROW SRC
+| | | parent-subplan=04
+| | | hosts=3 per-host-mem=unavailable
+| | | tuple-ids=1 row-size=124B cardinality=1
+| | |
+| | 06:UNNEST [o.o_lineitems]
+| | parent-subplan=04
+| | hosts=3 per-host-mem=unavailable
+| | tuple-ids=2 row-size=0B cardinality=10
+| |
+| 03:UNNEST [c.c_orders o]
+| parent-subplan=01
+| hosts=3 per-host-mem=unavailable
+| tuple-ids=1 row-size=0B cardinality=10
+|
+00:SCAN HDFS [tpch_nested_parquet.customer c, RANDOM]
+ partitions=1/1 files=4 size=292.36MB
+ predicates: c_custkey < 10, !empty(c.c_orders)
+ predicates on o: !empty(o.o_lineitems), o_orderkey < 5
+ predicates on o_lineitems: l_linenumber < 3
+ table stats: 150000 rows total
+ columns missing stats: c_orders
+ hosts=3 per-host-mem=unavailable
+ tuple-ids=0 row-size=254B cardinality=15000
+====
+# Hash-join in a subplan should work.
+select c.*
+from tpch_nested_parquet.customer c, c.c_orders o1, c.c_orders o2
+where o1.o_orderkey = o2.o_orderkey + 2 and o1.o_orderkey < 5
+---- PLAN
+PLAN-ROOT SINK
+|
+01:SUBPLAN
+| hosts=3 per-host-mem=unavailable
+| tuple-ids=1,0,2 row-size=286B cardinality=1500000
+|
+|--06:HASH JOIN [INNER JOIN]
+| | hash predicates: o1.o_orderkey = o2.o_orderkey + 2
+| | hosts=3 per-host-mem=unavailable
+| | tuple-ids=1,0,2 row-size=286B cardinality=10
+| |
+| |--04:UNNEST [c.c_orders o2]
+| | parent-subplan=01
+| | hosts=3 per-host-mem=unavailable
+| | tuple-ids=2 row-size=0B cardinality=10
+| |
+| 05:NESTED LOOP JOIN [CROSS JOIN]
+| | hosts=3 per-host-mem=unavailable
+| | tuple-ids=1,0 row-size=278B cardinality=10
+| |
+| |--02:SINGULAR ROW SRC
+| | parent-subplan=01
+| | hosts=3 per-host-mem=unavailable
+| | tuple-ids=0 row-size=270B cardinality=1
+| |
+| 03:UNNEST [c.c_orders o1]
+| parent-subplan=01
+| hosts=3 per-host-mem=unavailable
+| tuple-ids=1 row-size=0B cardinality=10
+|
+00:SCAN HDFS [tpch_nested_parquet.customer c]
+ partitions=1/1 files=4 size=292.36MB
+ predicates: !empty(c.c_orders), !empty(c.c_orders)
+ predicates on o1: o1.o_orderkey < 5
+ table stats: 150000 rows total
+ columns missing stats: c_orders, c_orders
+ hosts=3 per-host-mem=unavailable
+ tuple-ids=0 row-size=270B cardinality=150000
+---- PARALLELPLANS
+PLAN-ROOT SINK
+|
+07:EXCHANGE [UNPARTITIONED]
+| hosts=3 per-host-mem=unavailable
+| tuple-ids=1,0,2 row-size=286B cardinality=1500000
+|
+01:SUBPLAN
+| hosts=3 per-host-mem=unavailable
+| tuple-ids=1,0,2 row-size=286B cardinality=1500000
+|
+|--06:HASH JOIN [INNER JOIN]
+| | hash predicates: o1.o_orderkey = o2.o_orderkey + 2
+| | hosts=3 per-host-mem=unavailable
+| | tuple-ids=1,0,2 row-size=286B cardinality=10
+| |
+| |--04:UNNEST [c.c_orders o2]
+| | parent-subplan=01
+| | hosts=3 per-host-mem=unavailable
+| | tuple-ids=2 row-size=0B cardinality=10
+| |
+| 05:NESTED LOOP JOIN [CROSS JOIN]
+| | hosts=3 per-host-mem=unavailable
+| | tuple-ids=1,0 row-size=278B cardinality=10
+| |
+| |--02:SINGULAR ROW SRC
+| | parent-subplan=01
+| | hosts=3 per-host-mem=unavailable
+| | tuple-ids=0 row-size=270B cardinality=1
+| |
+| 03:UNNEST [c.c_orders o1]
+| parent-subplan=01
+| hosts=3 per-host-mem=unavailable
+| tuple-ids=1 row-size=0B cardinality=10
+|
+00:SCAN HDFS [tpch_nested_parquet.customer c, RANDOM]
+ partitions=1/1 files=4 size=292.36MB
+ predicates: !empty(c.c_orders), !empty(c.c_orders)
+ predicates on o1: o1.o_orderkey < 5
+ table stats: 150000 rows total
+ columns missing stats: c_orders, c_orders
+ hosts=3 per-host-mem=unavailable
+ tuple-ids=0 row-size=270B cardinality=150000
+====