You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/10/18 16:43:45 UTC

[22/32] incubator-impala git commit: IMPALA-4270: Gracefully fail unsupported queries with mt_dop > 0.

IMPALA-4270: Gracefully fail unsupported queries with mt_dop > 0.

MT_DOP > 0 is only supported for plans without distributed joins
or table sinks. Adds validation to fail unsupported queries
gracefully in planning.

For scans in queries that are executable with MT_DOP > 0 we either
use the optimized MT scan node BE implementation (only Parquet), or
we use the conventional scan node with num_scanner_threads=1.

TODO: Still need to add end-to-end tests.

Change-Id: I91a60ea7b6e3ae4ee44be856615ddd3cd0af476d
Reviewed-on: http://gerrit.cloudera.org:8080/4677
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/04802535
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/04802535
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/04802535

Branch: refs/heads/hadoop-next
Commit: 04802535661979c50e5d06ef04e62eee677b901e
Parents: b0e87c6
Author: Alex Behm <al...@cloudera.com>
Authored: Mon Oct 10 11:03:43 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Mon Oct 17 09:22:57 2016 +0000

----------------------------------------------------------------------
 be/src/exec/exec-node.cc                        |   5 +-
 common/thrift/PlanNodes.thrift                  |   5 +
 .../org/apache/impala/analysis/Analyzer.java    |  10 +-
 .../org/apache/impala/planner/HdfsScanNode.java |  28 +-
 .../java/org/apache/impala/planner/Planner.java |  13 +-
 .../apache/impala/planner/PlannerContext.java   |  10 +-
 .../impala/planner/SingleNodePlanner.java       |  24 +-
 .../org/apache/impala/planner/PlannerTest.java  |  21 +-
 .../apache/impala/planner/PlannerTestBase.java  |  18 +-
 .../queries/PlannerTest/mt-dop-validation.test  | 350 +++++++++++++++++++
 10 files changed, 450 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/04802535/be/src/exec/exec-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index 837fc09..df491dd 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -264,9 +264,12 @@ Status ExecNode::CreateNode(ObjectPool* pool, const TPlanNode& tnode,
   switch (tnode.node_type) {
     case TPlanNodeType::HDFS_SCAN_NODE:
       *node = pool->Add(new HdfsScanNode(pool, tnode, descs));
-      if (state->query_options().mt_dop > 0) {
+      if (tnode.hdfs_scan_node.use_mt_scan_node) {
+        DCHECK_GT(state->query_options().mt_dop, 0);
         *node = pool->Add(new HdfsScanNodeMt(pool, tnode, descs));
       } else {
+        DCHECK(state->query_options().mt_dop == 0
+            || state->query_options().num_scanner_threads == 1);
         *node = pool->Add(new HdfsScanNode(pool, tnode, descs));
       }
       // If true, this node requests codegen over interpretation for conjuncts

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/04802535/common/thrift/PlanNodes.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index 4cf1357..49fcfbb 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -202,6 +202,11 @@ struct THdfsScanNode {
   // Number of header lines to skip at the beginning of each file of this table. Only set
   // for hdfs text files.
   6: optional i32 skip_header_line_count
+
+  // Indicates whether the MT scan node implementation should be used.
+  // If this is true then the MT_DOP query option must be > 0.
+  // TODO: Remove this option when the MT scan node supports all file formats.
+  7: optional bool use_mt_scan_node
 }
 
 struct TDataSourceScanNode {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/04802535/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index 3edddf2..f9909b1 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -30,9 +30,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.impala.analysis.Path.PathType;
 import org.apache.impala.authorization.AuthorizationConfig;
 import org.apache.impala.authorization.Privilege;
@@ -66,10 +63,14 @@ import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TLineageGraph;
 import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.thrift.TQueryCtx;
+import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.util.DisjointSet;
 import org.apache.impala.util.EventSequence;
 import org.apache.impala.util.ListMap;
 import org.apache.impala.util.TSessionStateUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicates;
@@ -2246,6 +2247,9 @@ public class Analyzer {
   public String getDefaultDb() { return globalState_.queryCtx.session.database; }
   public User getUser() { return user_; }
   public TQueryCtx getQueryCtx() { return globalState_.queryCtx; }
+  public TQueryOptions getQueryOptions() {
+    return globalState_.queryCtx.getRequest().getQuery_options();
+  }
   public AuthorizationConfig getAuthzConfig() { return globalState_.authzConfig; }
   public ListMap<TNetworkAddress> getHostIndex() { return globalState_.hostIndex; }
   public ColumnLineageGraph getColumnLineageGraph() { return globalState_.lineageGraph; }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/04802535/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 4052867..3d52aa4 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -21,9 +21,7 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.Set;
 
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.Expr;
@@ -55,6 +53,9 @@ import org.apache.impala.thrift.TScanRange;
 import org.apache.impala.thrift.TScanRangeLocation;
 import org.apache.impala.thrift.TScanRangeLocations;
 import org.apache.impala.util.MembershipSnapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.common.base.Joiner;
 import com.google.common.base.Objects;
 import com.google.common.base.Objects.ToStringHelper;
@@ -107,6 +108,9 @@ public class HdfsScanNode extends ScanNode {
   // True if this scan node should use codegen for evaluting conjuncts.
   private boolean codegenConjuncts_;
 
+  // True if this scan node should use the MT implementation in the backend.
+  private boolean useMtScanNode_;
+
   // Conjuncts that can be evaluated while materializing the items (tuples) of
   // collection-typed slots. Maps from tuple descriptor to the conjuncts bound by that
   // tuple. Uses a linked hash map for consistent display in explain.
@@ -168,7 +172,16 @@ public class HdfsScanNode extends ScanNode {
     computeMemLayout(analyzer);
 
     // compute scan range locations
-    computeScanRangeLocations(analyzer);
+    Set<HdfsFileFormat> fileFormats = computeScanRangeLocations(analyzer);
+
+    // Determine backend scan node implementation to use. The optimized MT implementation
+    // is currently only supported for Parquet.
+    if (analyzer.getQueryOptions().mt_dop > 0 &&
+        fileFormats.size() == 1 && fileFormats.contains(HdfsFileFormat.PARQUET)) {
+      useMtScanNode_ = true;
+    } else {
+      useMtScanNode_ = false;
+    }
 
     // do this at the end so it can take all conjuncts and scan ranges into account
     computeStats(analyzer);
@@ -298,12 +311,15 @@ public class HdfsScanNode extends ScanNode {
   /**
    * Computes scan ranges (hdfs splits) plus their storage locations, including volume
    * ids, based on the given maximum number of bytes each scan range should scan.
+   * Returns the set of file formats being scanned.
    */
-  private void computeScanRangeLocations(Analyzer analyzer) {
+  private Set<HdfsFileFormat> computeScanRangeLocations(Analyzer analyzer) {
     long maxScanRangeLength = analyzer.getQueryCtx().getRequest().getQuery_options()
         .getMax_scan_range_length();
     scanRanges_ = Lists.newArrayList();
+    Set<HdfsFileFormat> fileFormats = Sets.newHashSet();
     for (HdfsPartition partition: partitions_) {
+      fileFormats.add(partition.getFileFormat());
       Preconditions.checkState(partition.getId() >= 0);
       for (HdfsPartition.FileDescriptor fileDesc: partition.getFileDescriptors()) {
         for (THdfsFileBlock thriftBlock: fileDesc.getFileBlocks()) {
@@ -353,6 +369,7 @@ public class HdfsScanNode extends ScanNode {
         }
       }
     }
+    return fileFormats;
   }
 
   /**
@@ -542,6 +559,7 @@ public class HdfsScanNode extends ScanNode {
     if (skipHeaderLineCount_ > 0) {
       msg.hdfs_scan_node.setSkip_header_line_count(skipHeaderLineCount_);
     }
+    msg.hdfs_scan_node.setUse_mt_scan_node(useMtScanNode_);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/04802535/fe/src/main/java/org/apache/impala/planner/Planner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index ed4c677..8abb901 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -21,9 +21,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.impala.analysis.AnalysisContext;
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.ColumnLineageGraph;
@@ -43,6 +40,9 @@ import org.apache.impala.thrift.TQueryExecRequest;
 import org.apache.impala.thrift.TRuntimeFilterMode;
 import org.apache.impala.thrift.TTableName;
 import org.apache.impala.util.MaxRowsProcessedVisitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -117,12 +117,13 @@ public class Planner {
           "Runtime filters computed");
     }
 
+    singleNodePlanner.validatePlan(singleNodePlan);
+
     if (ctx_.isSingleNodeExec()) {
       // create one fragment containing the entire single-node plan tree
       fragments = Lists.newArrayList(new PlanFragment(
           ctx_.getNextFragmentId(), singleNodePlan, DataPartition.UNPARTITIONED));
     } else {
-      singleNodePlanner.validatePlan(singleNodePlan);
       // create distributed plan
       fragments = distributedPlanner.createPlanFragments(singleNodePlan);
     }
@@ -200,10 +201,14 @@ public class Planner {
    * TODO: roll into createPlan()
    */
   public List<PlanFragment> createParallelPlans() throws ImpalaException {
+    Preconditions.checkState(ctx_.getQueryOptions().mt_dop > 0);
     ArrayList<PlanFragment> distrPlan = createPlan();
     Preconditions.checkNotNull(distrPlan);
     ParallelPlanner planner = new ParallelPlanner(ctx_);
     List<PlanFragment> parallelPlans = planner.createPlans(distrPlan.get(0));
+    // Only use one scanner thread per scan-node instance since intra-node
+    // parallelism is achieved via multiple fragment instances.
+    ctx_.getQueryOptions().setNum_scanner_threads(1);
     ctx_.getRootAnalyzer().getTimeline().markEvent("Parallel plans created");
     return parallelPlans;
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/04802535/fe/src/main/java/org/apache/impala/planner/PlannerContext.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/PlannerContext.java b/fe/src/main/java/org/apache/impala/planner/PlannerContext.java
index 3275a7a..721acf9 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlannerContext.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlannerContext.java
@@ -25,6 +25,7 @@ import org.apache.impala.analysis.QueryStmt;
 import org.apache.impala.common.IdGenerator;
 import org.apache.impala.thrift.TQueryCtx;
 import org.apache.impala.thrift.TQueryOptions;
+
 import com.google.common.collect.Lists;
 
 /**
@@ -79,9 +80,7 @@ public class PlannerContext {
 
   public QueryStmt getQueryStmt() { return queryStmt_; }
   public TQueryCtx getQueryCtx() { return queryCtx_; }
-  public TQueryOptions getQueryOptions() {
-    return queryCtx_.getRequest().getQuery_options();
-  }
+  public TQueryOptions getQueryOptions() { return getRootAnalyzer().getQueryOptions(); }
   public AnalysisContext.AnalysisResult getAnalysisResult() { return analysisResult_; }
   public Analyzer getRootAnalyzer() { return analysisResult_.getAnalyzer(); }
   public boolean isSingleNodeExec() { return getQueryOptions().num_nodes == 1; }
@@ -91,7 +90,10 @@ public class PlannerContext {
     return analysisResult_.isInsertStmt() || analysisResult_.isCreateTableAsSelectStmt();
   }
   public boolean isQuery() { return analysisResult_.isQueryStmt(); }
-
+  public boolean hasTableSink() {
+    return isInsertOrCtas() || analysisResult_.isUpdateStmt()
+        || analysisResult_.isDeleteStmt();
+  }
   public boolean hasSubplan() { return !subplans_.isEmpty(); }
   public SubplanNode getSubplan() { return subplans_.getFirst(); }
   public boolean pushSubplan(SubplanNode n) { return subplans_.offerFirst(n); }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/04802535/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
index b686fe6..434e36d 100644
--- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
@@ -27,9 +27,6 @@ import java.util.ListIterator;
 import java.util.Map;
 import java.util.Set;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.impala.analysis.AggregateInfo;
 import org.apache.impala.analysis.AnalyticInfo;
 import org.apache.impala.analysis.Analyzer;
@@ -67,6 +64,10 @@ import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.common.NotImplementedException;
 import org.apache.impala.common.Pair;
+import org.apache.impala.common.RuntimeEnv;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
@@ -148,11 +149,22 @@ public class SingleNodePlanner {
   }
 
   /**
-   * Validates a single-node plan by checking that it does not contain right or
-   * full outer joins with no equi-join conjuncts that are not inside the right child
-   * of a SubplanNode. Throws a NotImplementedException if plan validation fails.
+   * Checks that the given single-node plan is executable:
+   * - It may not contain right or full outer joins with no equi-join conjuncts that
+   *   are not inside the right child of a SubplanNode.
+   * - MT_DOP > 0 is not supported for plans with base table joins or table sinks.
+   * Throws a NotImplementedException if plan validation fails.
    */
   public void validatePlan(PlanNode planNode) throws NotImplementedException {
+    if (ctx_.getQueryOptions().mt_dop > 0 && !RuntimeEnv.INSTANCE.isTestEnv()
+        && (planNode instanceof JoinNode || ctx_.hasTableSink())) {
+      throw new NotImplementedException(
+          "MT_DOP not supported for plans with base table joins or table sinks.");
+    }
+
+    // As long as MT_DOP == 0 any join can run in a single-node plan.
+    if (ctx_.isSingleNodeExec() && ctx_.getQueryOptions().mt_dop == 0) return;
+
     if (planNode instanceof NestedLoopJoinNode) {
       JoinNode joinNode = (JoinNode) planNode;
       JoinOperator joinOp = joinNode.getJoinOp();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/04802535/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 88a8631..6250969 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -17,14 +17,13 @@
 
 package org.apache.impala.planner;
 
-import org.junit.Assume;
-import org.junit.Test;
-
 import org.apache.impala.catalog.Db;
 import org.apache.impala.common.RuntimeEnv;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TRuntimeFilterMode;
+import org.junit.Assume;
+import org.junit.Test;
 
 // All planner tests, except for S3 specific tests should go here.
 public class PlannerTest extends PlannerTestBase {
@@ -279,4 +278,20 @@ public class PlannerTest extends PlannerTestBase {
     Assume.assumeTrue(RuntimeEnv.INSTANCE.isKuduSupported());
     runPlannerTestFile("tpch-kudu");
   }
+
+  @Test
+  public void testMtDopValidation() {
+    // Tests that queries supported with mt_dop > 0 produce a parallel plan, or
+    // throw a NotImplementedException otherwise (e.g. plan has a distributed join).
+    TQueryOptions options = defaultQueryOptions();
+    options.setMt_dop(3);
+    try {
+      // Temporarily unset the test env such that unsupported queries with mt_dop > 0
+      // throw an exception. Those are otherwise allowed for testing parallel plans.
+      RuntimeEnv.INSTANCE.setTestEnv(false);
+      runPlannerTestFile("mt-dop-validation", options);
+    } finally {
+      RuntimeEnv.INSTANCE.setTestEnv(true);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/04802535/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
index 284d7e5..9c12b89 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
@@ -33,13 +33,6 @@ import java.util.regex.Pattern;
 
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.fs.Path;
-import org.apache.kudu.client.KuduClient;
-import org.apache.kudu.client.KuduScanToken;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.impala.analysis.ColumnLineageGraph;
 import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.common.FrontendTestBase;
@@ -72,6 +65,13 @@ import org.apache.impala.thrift.TTableDescriptor;
 import org.apache.impala.thrift.TTupleDescriptor;
 import org.apache.impala.thrift.TUpdateMembershipRequest;
 import org.apache.impala.util.MembershipSnapshot;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduScanToken;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -503,6 +503,7 @@ public class PlannerTestBase extends FrontendTestBase {
     // Query exec request may not be set for DDL, e.g., CTAS.
     String locationsStr = null;
     if (execRequest != null && execRequest.isSetQuery_exec_request()) {
+      if (execRequest.query_exec_request.fragments == null) return;
       buildMaps(execRequest.query_exec_request);
       // If we optimize the partition key scans, we may get all the partition key values
       // from the metadata and don't reference any table. Skip the check in this case.
@@ -563,7 +564,8 @@ public class PlannerTestBase extends FrontendTestBase {
       String query, TExecRequest execRequest, StringBuilder errorLog) {
     if (execRequest == null) return;
     if (!execRequest.isSetQuery_exec_request()
-        || execRequest.query_exec_request == null) {
+        || execRequest.query_exec_request == null
+        || execRequest.query_exec_request.fragments == null) {
       return;
     }
     for (TPlanFragment planFragment : execRequest.query_exec_request.fragments) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/04802535/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
new file mode 100644
index 0000000..fe25599
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
@@ -0,0 +1,350 @@
+# Distributed nested-loop join not allowed.
+select count(*) from
+functional_parquet.alltypestiny a,
+functional_parquet.alltypestiny b
+---- PLAN
+not implemented: MT_DOP not supported for plans with base table joins or table sinks.
+---- PARALLELPLANS
+not implemented: MT_DOP not supported for plans with base table joins or table sinks.
+====
+# Distributed hash-join not allowed.
+select count(*) from
+functional_parquet.alltypestiny a,
+functional_parquet.alltypestiny b
+where a.id = b.id
+---- PLAN
+not implemented: MT_DOP not supported for plans with base table joins or table sinks.
+---- PARALLELPLANS
+not implemented: MT_DOP not supported for plans with base table joins or table sinks.
+====
+# Insert not allowed.
+insert into functional_parquet.alltypes partition(year,month)
+select * from functional_parquet.alltypessmall
+---- PLAN
+not implemented: MT_DOP not supported for plans with base table joins or table sinks.
+---- PARALLELPLANS
+not implemented: MT_DOP not supported for plans with base table joins or table sinks.
+====
+# CTAS not allowed.
+create table ctas_mt_dop_test as select * from functional_parquet.alltypes
+---- PLAN
+not implemented: MT_DOP not supported for plans with base table joins or table sinks.
+---- PARALLELPLANS
+not implemented: MT_DOP not supported for plans with base table joins or table sinks.
+====
+# Single-table scan/filter/agg/topn should work.
+select count(int_col) cnt from functional_parquet.alltypes
+where id < 10
+group by bigint_col
+order by cnt, bigint_col
+limit 10
+---- PLAN
+PLAN-ROOT SINK
+|
+02:TOP-N [LIMIT=10]
+|  order by: count(int_col) ASC, bigint_col ASC
+|  hosts=3 per-host-mem=unavailable
+|  tuple-ids=2 row-size=16B cardinality=10
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(int_col)
+|  group by: bigint_col
+|  hosts=3 per-host-mem=unavailable
+|  tuple-ids=1 row-size=16B cardinality=unavailable
+|
+00:SCAN HDFS [functional_parquet.alltypes]
+   partitions=24/24 files=24 size=156.57KB
+   predicates: id < 10
+   table stats: unavailable
+   column stats: unavailable
+   hosts=3 per-host-mem=unavailable
+   tuple-ids=0 row-size=16B cardinality=unavailable
+---- PARALLELPLANS
+PLAN-ROOT SINK
+|
+05:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: count(int_col) ASC, bigint_col ASC
+|  limit: 10
+|  hosts=3 per-host-mem=unavailable
+|  tuple-ids=2 row-size=16B cardinality=10
+|
+02:TOP-N [LIMIT=10]
+|  order by: count(int_col) ASC, bigint_col ASC
+|  hosts=3 per-host-mem=unavailable
+|  tuple-ids=2 row-size=16B cardinality=10
+|
+04:AGGREGATE [FINALIZE]
+|  output: count:merge(int_col)
+|  group by: bigint_col
+|  hosts=3 per-host-mem=unavailable
+|  tuple-ids=1 row-size=16B cardinality=unavailable
+|
+03:EXCHANGE [HASH(bigint_col)]
+|  hosts=3 per-host-mem=unavailable
+|  tuple-ids=1 row-size=16B cardinality=unavailable
+|
+01:AGGREGATE [STREAMING]
+|  output: count(int_col)
+|  group by: bigint_col
+|  hosts=3 per-host-mem=unavailable
+|  tuple-ids=1 row-size=16B cardinality=unavailable
+|
+00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
+   partitions=24/24 files=24 size=156.57KB
+   predicates: id < 10
+   table stats: unavailable
+   column stats: unavailable
+   hosts=3 per-host-mem=unavailable
+   tuple-ids=0 row-size=16B cardinality=unavailable
+====
+# Single-table scan/filter/analysic should work.
+select row_number() over(partition by int_col order by id)
+from functional_parquet.alltypes
+where id < 10
+---- PLAN
+PLAN-ROOT SINK
+|
+02:ANALYTIC
+|  functions: row_number()
+|  partition by: int_col
+|  order by: id ASC
+|  window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+|  hosts=3 per-host-mem=unavailable
+|  tuple-ids=3,2 row-size=16B cardinality=unavailable
+|
+01:SORT
+|  order by: int_col ASC NULLS FIRST, id ASC
+|  hosts=3 per-host-mem=unavailable
+|  tuple-ids=3 row-size=8B cardinality=unavailable
+|
+00:SCAN HDFS [functional_parquet.alltypes]
+   partitions=24/24 files=24 size=156.57KB
+   predicates: id < 10
+   table stats: unavailable
+   column stats: unavailable
+   hosts=3 per-host-mem=unavailable
+   tuple-ids=0 row-size=8B cardinality=unavailable
+---- PARALLELPLANS
+PLAN-ROOT SINK
+|
+04:EXCHANGE [UNPARTITIONED]
+|  hosts=3 per-host-mem=unavailable
+|  tuple-ids=3,2 row-size=16B cardinality=unavailable
+|
+02:ANALYTIC
+|  functions: row_number()
+|  partition by: int_col
+|  order by: id ASC
+|  window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+|  hosts=3 per-host-mem=unavailable
+|  tuple-ids=3,2 row-size=16B cardinality=unavailable
+|
+01:SORT
+|  order by: int_col ASC NULLS FIRST, id ASC
+|  hosts=3 per-host-mem=unavailable
+|  tuple-ids=3 row-size=8B cardinality=unavailable
+|
+03:EXCHANGE [HASH(int_col)]
+|  hosts=3 per-host-mem=unavailable
+|  tuple-ids=0 row-size=8B cardinality=unavailable
+|
+00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
+   partitions=24/24 files=24 size=156.57KB
+   predicates: id < 10
+   table stats: unavailable
+   column stats: unavailable
+   hosts=3 per-host-mem=unavailable
+   tuple-ids=0 row-size=8B cardinality=unavailable
+====
+# Nested-loop join in a subplan should work.
+select *
+from tpch_nested_parquet.customer c, c.c_orders o, o.o_lineitems
+where c_custkey < 10 and o_orderkey < 5 and l_linenumber < 3
+---- PLAN
+PLAN-ROOT SINK
+|
+01:SUBPLAN
+|  hosts=3 per-host-mem=unavailable
+|  tuple-ids=2,1,0 row-size=562B cardinality=1500000
+|
+|--08:NESTED LOOP JOIN [CROSS JOIN]
+|  |  hosts=3 per-host-mem=unavailable
+|  |  tuple-ids=2,1,0 row-size=562B cardinality=100
+|  |
+|  |--02:SINGULAR ROW SRC
+|  |     parent-subplan=01
+|  |     hosts=3 per-host-mem=unavailable
+|  |     tuple-ids=0 row-size=254B cardinality=1
+|  |
+|  04:SUBPLAN
+|  |  hosts=3 per-host-mem=unavailable
+|  |  tuple-ids=2,1 row-size=308B cardinality=100
+|  |
+|  |--07:NESTED LOOP JOIN [CROSS JOIN]
+|  |  |  hosts=3 per-host-mem=unavailable
+|  |  |  tuple-ids=2,1 row-size=308B cardinality=10
+|  |  |
+|  |  |--05:SINGULAR ROW SRC
+|  |  |     parent-subplan=04
+|  |  |     hosts=3 per-host-mem=unavailable
+|  |  |     tuple-ids=1 row-size=124B cardinality=1
+|  |  |
+|  |  06:UNNEST [o.o_lineitems]
+|  |     parent-subplan=04
+|  |     hosts=3 per-host-mem=unavailable
+|  |     tuple-ids=2 row-size=0B cardinality=10
+|  |
+|  03:UNNEST [c.c_orders o]
+|     parent-subplan=01
+|     hosts=3 per-host-mem=unavailable
+|     tuple-ids=1 row-size=0B cardinality=10
+|
+00:SCAN HDFS [tpch_nested_parquet.customer c]
+   partitions=1/1 files=4 size=292.36MB
+   predicates: c_custkey < 10, !empty(c.c_orders)
+   predicates on o: !empty(o.o_lineitems), o_orderkey < 5
+   predicates on o_lineitems: l_linenumber < 3
+   table stats: 150000 rows total
+   columns missing stats: c_orders
+   hosts=3 per-host-mem=unavailable
+   tuple-ids=0 row-size=254B cardinality=15000
+---- PARALLELPLANS
+PLAN-ROOT SINK
+|
+09:EXCHANGE [UNPARTITIONED]
+|  hosts=3 per-host-mem=unavailable
+|  tuple-ids=2,1,0 row-size=562B cardinality=1500000
+|
+01:SUBPLAN
+|  hosts=3 per-host-mem=unavailable
+|  tuple-ids=2,1,0 row-size=562B cardinality=1500000
+|
+|--08:NESTED LOOP JOIN [CROSS JOIN]
+|  |  hosts=3 per-host-mem=unavailable
+|  |  tuple-ids=2,1,0 row-size=562B cardinality=100
+|  |
+|  |--02:SINGULAR ROW SRC
+|  |     parent-subplan=01
+|  |     hosts=3 per-host-mem=unavailable
+|  |     tuple-ids=0 row-size=254B cardinality=1
+|  |
+|  04:SUBPLAN
+|  |  hosts=3 per-host-mem=unavailable
+|  |  tuple-ids=2,1 row-size=308B cardinality=100
+|  |
+|  |--07:NESTED LOOP JOIN [CROSS JOIN]
+|  |  |  hosts=3 per-host-mem=unavailable
+|  |  |  tuple-ids=2,1 row-size=308B cardinality=10
+|  |  |
+|  |  |--05:SINGULAR ROW SRC
+|  |  |     parent-subplan=04
+|  |  |     hosts=3 per-host-mem=unavailable
+|  |  |     tuple-ids=1 row-size=124B cardinality=1
+|  |  |
+|  |  06:UNNEST [o.o_lineitems]
+|  |     parent-subplan=04
+|  |     hosts=3 per-host-mem=unavailable
+|  |     tuple-ids=2 row-size=0B cardinality=10
+|  |
+|  03:UNNEST [c.c_orders o]
+|     parent-subplan=01
+|     hosts=3 per-host-mem=unavailable
+|     tuple-ids=1 row-size=0B cardinality=10
+|
+00:SCAN HDFS [tpch_nested_parquet.customer c, RANDOM]
+   partitions=1/1 files=4 size=292.36MB
+   predicates: c_custkey < 10, !empty(c.c_orders)
+   predicates on o: !empty(o.o_lineitems), o_orderkey < 5
+   predicates on o_lineitems: l_linenumber < 3
+   table stats: 150000 rows total
+   columns missing stats: c_orders
+   hosts=3 per-host-mem=unavailable
+   tuple-ids=0 row-size=254B cardinality=15000
+====
+# Hash-join in a subplan should work.
+select c.*
+from tpch_nested_parquet.customer c, c.c_orders o1, c.c_orders o2
+where o1.o_orderkey = o2.o_orderkey + 2 and o1.o_orderkey < 5
+---- PLAN
+PLAN-ROOT SINK
+|
+01:SUBPLAN
+|  hosts=3 per-host-mem=unavailable
+|  tuple-ids=1,0,2 row-size=286B cardinality=1500000
+|
+|--06:HASH JOIN [INNER JOIN]
+|  |  hash predicates: o1.o_orderkey = o2.o_orderkey + 2
+|  |  hosts=3 per-host-mem=unavailable
+|  |  tuple-ids=1,0,2 row-size=286B cardinality=10
+|  |
+|  |--04:UNNEST [c.c_orders o2]
+|  |     parent-subplan=01
+|  |     hosts=3 per-host-mem=unavailable
+|  |     tuple-ids=2 row-size=0B cardinality=10
+|  |
+|  05:NESTED LOOP JOIN [CROSS JOIN]
+|  |  hosts=3 per-host-mem=unavailable
+|  |  tuple-ids=1,0 row-size=278B cardinality=10
+|  |
+|  |--02:SINGULAR ROW SRC
+|  |     parent-subplan=01
+|  |     hosts=3 per-host-mem=unavailable
+|  |     tuple-ids=0 row-size=270B cardinality=1
+|  |
+|  03:UNNEST [c.c_orders o1]
+|     parent-subplan=01
+|     hosts=3 per-host-mem=unavailable
+|     tuple-ids=1 row-size=0B cardinality=10
+|
+00:SCAN HDFS [tpch_nested_parquet.customer c]
+   partitions=1/1 files=4 size=292.36MB
+   predicates: !empty(c.c_orders), !empty(c.c_orders)
+   predicates on o1: o1.o_orderkey < 5
+   table stats: 150000 rows total
+   columns missing stats: c_orders, c_orders
+   hosts=3 per-host-mem=unavailable
+   tuple-ids=0 row-size=270B cardinality=150000
+---- PARALLELPLANS
+PLAN-ROOT SINK
+|
+07:EXCHANGE [UNPARTITIONED]
+|  hosts=3 per-host-mem=unavailable
+|  tuple-ids=1,0,2 row-size=286B cardinality=1500000
+|
+01:SUBPLAN
+|  hosts=3 per-host-mem=unavailable
+|  tuple-ids=1,0,2 row-size=286B cardinality=1500000
+|
+|--06:HASH JOIN [INNER JOIN]
+|  |  hash predicates: o1.o_orderkey = o2.o_orderkey + 2
+|  |  hosts=3 per-host-mem=unavailable
+|  |  tuple-ids=1,0,2 row-size=286B cardinality=10
+|  |
+|  |--04:UNNEST [c.c_orders o2]
+|  |     parent-subplan=01
+|  |     hosts=3 per-host-mem=unavailable
+|  |     tuple-ids=2 row-size=0B cardinality=10
+|  |
+|  05:NESTED LOOP JOIN [CROSS JOIN]
+|  |  hosts=3 per-host-mem=unavailable
+|  |  tuple-ids=1,0 row-size=278B cardinality=10
+|  |
+|  |--02:SINGULAR ROW SRC
+|  |     parent-subplan=01
+|  |     hosts=3 per-host-mem=unavailable
+|  |     tuple-ids=0 row-size=270B cardinality=1
+|  |
+|  03:UNNEST [c.c_orders o1]
+|     parent-subplan=01
+|     hosts=3 per-host-mem=unavailable
+|     tuple-ids=1 row-size=0B cardinality=10
+|
+00:SCAN HDFS [tpch_nested_parquet.customer c, RANDOM]
+   partitions=1/1 files=4 size=292.36MB
+   predicates: !empty(c.c_orders), !empty(c.c_orders)
+   predicates on o1: o1.o_orderkey < 5
+   table stats: 150000 rows total
+   columns missing stats: c_orders, c_orders
+   hosts=3 per-host-mem=unavailable
+   tuple-ids=0 row-size=270B cardinality=150000
+====