You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/10/22 00:35:23 UTC

[impala] 02/02: IMPALA-8997: auto fallback to mt_dop=0

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 96143cfcf11f2a455a0f2511da67dc2e6b69d7f3
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Tue Oct 1 20:37:05 2019 -0700

    IMPALA-8997: auto fallback to mt_dop=0
    
    Add a temporary --mt_dop_auto_fallback to allow a graceful transition to
    using mt_dop for workloads. When this flag is set, DML queries and joins
    that would otherwise fail with an error when run with mt_dop > 0 fall
    back to running with mt_dop = 0. This means that a user can set mt_dop
    for their queries and it will only take effect when supported.
    
    The behaviour generally does not change when this flag is not set,
    with a couple of exceptions:
    * I made mt_dop automatic for compute stats on all file formats
    * mt_dop is allowed for single node plans with inserts. The
      quirky validatePlan() logic previously disallowed this but
      allowed joins in single node plans.
    
    The checks added by this patch can be removed safely once mt_dop is
    supported by default for all queries.
    
    This includes some cleanup:
    * isDmlStmt() was stale and incorrectly implemented.
    * Various TreeNode methods did not return instances of subclasses of
      the requested class, which was strange. This fix is required to
      make 'contains(JoinNode.class)' work correctly. I checked the
      callsites of the fixed functions and none of them would be affected
      by this change because they specified a terminal class without
      any subclasses.
      I didn't actually use this fix in the end (I had to write a custom
      tree traversal in hasUnsupportedMtDopJoin()), but figured I would
      leave the improvement in here.
    
    Testing:
    Add some basic functional tests ensuring that the fallback takes
    effect.
    
    Run basic join and insert tests with this flag enabled.
    
    Change-Id: Ie0d73d8744059874293697c8e104891a10dba04d
    Reviewed-on: http://gerrit.cloudera.org:8080/14344
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/common/global-flags.cc                      |  4 ++
 be/src/util/backend-gflag-util.cc                  |  2 +
 common/thrift/BackendGflags.thrift                 |  2 +
 .../apache/impala/analysis/AnalysisContext.java    |  2 +-
 .../java/org/apache/impala/common/TreeNode.java    |  6 +-
 .../org/apache/impala/planner/HdfsScanNode.java    |  5 +-
 .../org/apache/impala/planner/KuduScanNode.java    | 11 +---
 .../java/org/apache/impala/planner/Planner.java    | 22 ++++++++
 .../apache/impala/planner/SingleNodePlanner.java   | 32 ++++++-----
 .../org/apache/impala/service/BackendConfig.java   |  4 ++
 .../java/org/apache/impala/service/Frontend.java   | 14 ++---
 .../org/apache/impala/planner/PlannerTest.java     | 17 +++---
 .../queries/PlannerTest/mt-dop-validation.test     | 66 +++++++++++++++++-----
 .../queries/QueryTest/mt-dop-auto-fallback.test    | 33 +++++++++++
 tests/custom_cluster/test_mt_dop.py                | 16 +++++-
 tests/query_test/test_cancellation.py              |  3 +-
 16 files changed, 177 insertions(+), 62 deletions(-)

diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index ccd8921..a739640 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -269,6 +269,10 @@ DEFINE_double_hidden(invalidate_tables_fraction_on_memory_pressure, 0.1,
 DEFINE_bool_hidden(unlock_mt_dop, false,
     "(Experimental) If true, allow specifying mt_dop for all queries.");
 
+DEFINE_bool_hidden(mt_dop_auto_fallback, false,
+    "(Experimental) If true, fall back to non-mt_dop if mt_dop query option is set and "
+    "a query does not support it. Has no effect if --unlock_mt_dop is true.");
+
 DEFINE_bool_hidden(recursively_list_partitions, true,
     "If true, recursively list the content of partition directories.");
 
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index aa24377..56f6a65 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -69,6 +69,7 @@ DECLARE_int32(kudu_error_buffer_size);
 DECLARE_int32(hms_event_polling_interval_s);
 DECLARE_string(authorization_factory_class);
 DECLARE_bool(unlock_mt_dop);
+DECLARE_bool(mt_dop_auto_fallback);
 DECLARE_string(ranger_service_type);
 DECLARE_string(ranger_app_id);
 DECLARE_string(authorization_provider);
@@ -152,6 +153,7 @@ Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* cfg_bytes) {
   cfg.__set_impala_build_version(::GetDaemonBuildVersion());
   cfg.__set_authorization_factory_class(FLAGS_authorization_factory_class);
   cfg.__set_unlock_mt_dop(FLAGS_unlock_mt_dop);
+  cfg.__set_mt_dop_auto_fallback(FLAGS_mt_dop_auto_fallback);
   cfg.__set_ranger_service_type(FLAGS_ranger_service_type);
   cfg.__set_ranger_app_id(FLAGS_ranger_app_id);
   cfg.__set_authorization_provider(FLAGS_authorization_provider);
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index 068cedb..af29d2c 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -143,4 +143,6 @@ struct TBackendGflags {
   59: required bool unlock_zorder_sort
 
   60: required bool simplify_check_on_show_tables
+
+  61: required bool mt_dop_auto_fallback
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
index 29cb6f8..abce01a 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
@@ -182,7 +182,7 @@ public class AnalysisContext {
     }
 
     public boolean isDmlStmt() {
-      return isInsertStmt();
+      return isInsertStmt() || isUpdateStmt() || isDeleteStmt();
     }
 
     /**
diff --git a/fe/src/main/java/org/apache/impala/common/TreeNode.java b/fe/src/main/java/org/apache/impala/common/TreeNode.java
index b88ac3c..7d3afa1 100644
--- a/fe/src/main/java/org/apache/impala/common/TreeNode.java
+++ b/fe/src/main/java/org/apache/impala/common/TreeNode.java
@@ -131,7 +131,7 @@ public abstract class TreeNode<NodeType extends TreeNode<NodeType>> {
   @SuppressWarnings("unchecked")
   public <C extends TreeNode<NodeType>, D extends C> void collect(
       Class<D> cl, Collection<D> matches) {
-    if (cl.equals(getClass())) {
+    if (cl.isAssignableFrom(getClass())) {
       matches.add((D) this);
       return;
     }
@@ -183,7 +183,7 @@ public abstract class TreeNode<NodeType extends TreeNode<NodeType>> {
    * Return true if this node or any of its children is an instance of class 'cl'.
    */
   public <C extends TreeNode<NodeType>> boolean contains(Class<C> cl) {
-    if (cl.equals(getClass())) return true;
+    if (cl.isAssignableFrom(getClass())) return true;
     for (NodeType child: children_) if (child.contains(cl)) return true;
     return false;
   }
@@ -221,7 +221,7 @@ public abstract class TreeNode<NodeType extends TreeNode<NodeType>> {
    */
   @SuppressWarnings("unchecked")
   public <C extends NodeType> C findFirstOf(Class<C> cl) {
-    if (this.getClass().equals(cl)) return (C) this;
+    if (cl.isAssignableFrom(getClass())) return (C) this;
     for (NodeType child: children_) {
       NodeType result = child.findFirstOf(cl);
       if (result != null) return (C) result;
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 41480df..5231af1 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -230,6 +230,7 @@ public class HdfsScanNode extends ScanNode {
   private long maxScanRangeNumRows_ = -1;
 
   // True if this scan node should use the MT implementation in the backend.
+  // Set in computeNodeResourceProfile().
   private boolean useMtScanNode_;
 
   // Conjuncts that can be evaluated while materializing the items (tuples) of
@@ -365,9 +366,6 @@ public class HdfsScanNode extends ScanNode {
     // compute scan range locations with optional sampling
     computeScanRangeLocations(analyzer);
 
-    useMtScanNode_ =
-        analyzer.getQueryOptions().isSetMt_dop() && analyzer.getQueryOptions().mt_dop > 0;
-
     if (fileFormats_.contains(HdfsFileFormat.PARQUET)) {
       // Compute min-max conjuncts only if the PARQUET_READ_STATISTICS query option is
       // set to true.
@@ -1564,6 +1562,7 @@ public class HdfsScanNode extends ScanNode {
     }
 
     // The non-MT scan node requires at least one scanner thread.
+    useMtScanNode_ = queryOptions.mt_dop > 0;
     int requiredThreads = useMtScanNode_ ? 0 : 1;
     int maxScannerThreads = computeMaxNumberOfScannerThreads(queryOptions,
         perHostScanRanges);
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
index ec19602..5e5ea96 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
@@ -96,6 +96,7 @@ public class KuduScanNode extends ScanNode {
   private final FeKuduTable kuduTable_;
 
   // True if this scan node should use the MT implementation in the backend.
+  // Set in computeNodeResourceProfile().
   private boolean useMtScanNode_;
 
   // Indexes for the set of hosts that will be used for the query.
@@ -154,15 +155,6 @@ public class KuduScanNode extends ScanNode {
     } catch (Exception e) {
       throw new ImpalaRuntimeException("Unable to initialize the Kudu scan node", e);
     }
-
-    // Determine backend scan node implementation to use.
-    if (analyzer.getQueryOptions().isSetMt_dop() &&
-        analyzer.getQueryOptions().mt_dop > 0) {
-      useMtScanNode_ = true;
-    } else {
-      useMtScanNode_ = false;
-    }
-
     computeStats(analyzer);
   }
 
@@ -314,6 +306,7 @@ public class KuduScanNode extends ScanNode {
         kudu_scanner_thread_max_estimated_bytes;
     long mem_estimate_per_thread = Math.min(num_cols *
         estimated_bytes_per_column_per_thread, max_estimated_bytes_per_thread);
+    useMtScanNode_ = queryOptions.mt_dop > 0;
     nodeResourceProfile_ = new ResourceProfileBuilder()
         .setMemEstimateBytes(mem_estimate_per_thread * maxScannerThreads)
         .setThreadReservation(useMtScanNode_ ? 0 : 1).build();
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index 6a51983..b8a3c4a 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -38,6 +38,7 @@ import org.apache.impala.catalog.FeHBaseTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.common.ImpalaException;
+import org.apache.impala.common.NotImplementedException;
 import org.apache.impala.common.PrintUtils;
 import org.apache.impala.common.RuntimeEnv;
 import org.apache.impala.service.BackendConfig;
@@ -124,6 +125,27 @@ public class Planner {
     invertJoins(singleNodePlan, ctx_.isSingleNodeExec());
     singleNodePlan = useNljForSingularRowBuilds(singleNodePlan, ctx_.getRootAnalyzer());
 
+    // MT_DOP > 0 is not supported by default for plans with base table joins or table
+    // sinks: we only allow MT_DOP > 0 with such plans if --unlock_mt_dop=true is
+    // specified. We allow single node plans with mt_dop since there is no actual
+    // parallelism.
+    if (!ctx_.isSingleNodeExec()
+        && ctx_.getQueryOptions().mt_dop > 0
+        && !RuntimeEnv.INSTANCE.isTestEnv()
+        && !BackendConfig.INSTANCE.isMtDopUnlocked()
+        && (ctx_.hasTableSink() ||
+            singleNodePlanner.hasUnsupportedMtDopJoin(singleNodePlan))) {
+      if (BackendConfig.INSTANCE.mtDopAutoFallback()) {
+        // Fall back to non-dop mode. This assumes that the mt_dop value is only used
+        // in the distributed planning process, which should be generally true as long
+        // as the value isn't cached in any plan nodes.
+        ctx_.getQueryOptions().setMt_dop(0);
+      } else {
+        throw new NotImplementedException(
+            "MT_DOP not supported for plans with base table joins or table sinks.");
+      }
+    }
+
     singleNodePlanner.validatePlan(singleNodePlan);
 
     if (ctx_.isSingleNodeExec()) {
diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
index f11c031..eeb6ef9 100644
--- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
@@ -68,8 +68,6 @@ import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.common.NotImplementedException;
 import org.apache.impala.common.Pair;
-import org.apache.impala.common.RuntimeEnv;
-import org.apache.impala.service.BackendConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -158,20 +156,9 @@ public class SingleNodePlanner {
    * Checks that the given single-node plan is executable:
    * - It may not contain right or full outer joins with no equi-join conjuncts that
    *   are not inside the right child of a SubplanNode.
-   * - MT_DOP > 0 is not supported by default for plans with base table joins or table
-   *   sinks: we only allow MT_DOP > 0 with such plans if --unlock_mt_dop=true is
-   *   specified.
    * Throws a NotImplementedException if plan validation fails.
    */
   public void validatePlan(PlanNode planNode) throws NotImplementedException {
-    if (ctx_.getQueryOptions().isSetMt_dop() && ctx_.getQueryOptions().mt_dop > 0
-        && !RuntimeEnv.INSTANCE.isTestEnv()
-        && !BackendConfig.INSTANCE.isMtDopUnlocked()
-        && (planNode instanceof JoinNode || ctx_.hasTableSink())) {
-      throw new NotImplementedException(
-          "MT_DOP not supported for plans with base table joins or table sinks.");
-    }
-
     // Any join can run in a single-node plan.
     if (ctx_.isSingleNodeExec()) return;
 
@@ -200,6 +187,25 @@ public class SingleNodePlanner {
   }
 
   /**
+   * Returns true if there is a join in the plan outside of the right branch of a
+   * subplan. This specific behaviour maintains compatibility with older
+   * validatePlan() logic that allowed joins with mt_dop only in this specific case
+   * (presumably by accident).
+   */
+  public boolean hasUnsupportedMtDopJoin(PlanNode planNode) {
+    if (planNode instanceof JoinNode) return true;
+
+    if (planNode instanceof SubplanNode) {
+      return hasUnsupportedMtDopJoin(planNode.getChild(0));
+    }
+
+    for (PlanNode child : planNode.getChildren()) {
+      if (hasUnsupportedMtDopJoin(child)) return true;
+    }
+    return false;
+  }
+
+  /**
    * Creates an EmptyNode that 'materializes' the tuples of the given stmt.
    * Marks all collection-typed slots referenced in stmt as non-materialized because
    * they are never unnested, and therefore the corresponding parent scan should not
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 1f77d16..cf6b767 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -145,6 +145,10 @@ public class BackendConfig {
     return backendCfg_.unlock_mt_dop;
   }
 
+  public boolean mtDopAutoFallback() {
+    return backendCfg_.mt_dop_auto_fallback;
+  }
+
   public boolean recursivelyListPartitions() {
     return backendCfg_.recursively_list_partitions;
   }
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 8bf468d..a8e8457 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -34,8 +34,6 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.DataOperationType;
 import org.apache.hadoop.hive.metastore.api.LockComponent;
@@ -1170,7 +1168,7 @@ public class Frontend {
       Planner planner, PlanCtx planCtx) throws ImpalaException {
     TQueryCtx queryCtx = planner.getQueryCtx();
     AnalysisResult analysisResult = planner.getAnalysisResult();
-    boolean isMtExec = analysisResult.isQueryStmt()
+    boolean isMtExec = (analysisResult.isQueryStmt() || analysisResult.isDmlStmt())
         && queryCtx.client_request.query_options.isSetMt_dop()
         && queryCtx.client_request.query_options.mt_dop > 0;
 
@@ -1430,10 +1428,9 @@ public class Frontend {
    */
   private static void setMtDopForCatalogOp(
       AnalysisResult analysisResult, TQueryOptions queryOptions) {
-    // Set MT_DOP=4 for COMPUTE STATS on Parquet/ORC tables, unless the user has already
-    // provided another value for MT_DOP.
-    if (!queryOptions.isSetMt_dop() && analysisResult.isComputeStatsStmt()
-        && analysisResult.getComputeStatsStmt().isColumnar()) {
+    // Set MT_DOP=4 for COMPUTE STATS, unless the user has already provided another
+    // value for MT_DOP.
+    if (!queryOptions.isSetMt_dop() && analysisResult.isComputeStatsStmt()) {
       queryOptions.setMt_dop(4);
     }
     // If unset, set MT_DOP to 0 to simplify the rest of the code.
@@ -1503,8 +1500,7 @@ public class Frontend {
       AnalysisResult analysisResult, EventSequence timeline)
       throws ImpalaException {
     Preconditions.checkState(analysisResult.isQueryStmt() || analysisResult.isDmlStmt()
-        || analysisResult.isCreateTableAsSelectStmt() || analysisResult.isUpdateStmt()
-        || analysisResult.isDeleteStmt());
+        || analysisResult.isCreateTableAsSelectStmt());
     TQueryCtx queryCtx = planCtx.getQueryContext();
     Planner planner = new Planner(analysisResult, queryCtx, timeline);
     TQueryExecRequest queryExecRequest = createExecRequest(planner, planCtx);
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index a0e88c2..cf6d2e2 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -668,16 +668,17 @@ public class PlannerTest extends PlannerTestBase {
       // MT_DOP is not set automatically for stmt other than COMPUTE STATS.
       testEffectiveMtDop(
           "select * from functional_parquet.alltypes", mtDop, effectiveMtDop);
-      // MT_DOP is not set automatically for COMPUTE STATS on non-Parquet tables.
+
+      // MT_DOP is set automatically for COMPUTE STATS, but can be overridden by a
+      // user-provided MT_DOP.
+      int computeStatsEffectiveMtDop = (mtDop != -1) ? mtDop : 4;
+      testEffectiveMtDop(
+          "compute stats functional_parquet.alltypes", mtDop, computeStatsEffectiveMtDop);
+      testEffectiveMtDop(
+          "compute stats functional.alltypes", mtDop, computeStatsEffectiveMtDop);
       testEffectiveMtDop(
-          "compute stats functional.alltypes", mtDop, effectiveMtDop);
+          "compute stats functional_kudu.alltypes", mtDop, computeStatsEffectiveMtDop);
     }
-    // MT_DOP is set automatically for COMPUTE STATS on Parquet tables,
-    // but can be overridden by a user-provided MT_DOP.
-    testEffectiveMtDop("compute stats functional_parquet.alltypes", -1, 4);
-    testEffectiveMtDop("compute stats functional_parquet.alltypes", 0, 0);
-    testEffectiveMtDop("compute stats functional_parquet.alltypes", 1, 1);
-    testEffectiveMtDop("compute stats functional_parquet.alltypes", 16, 16);
   }
 
   /**
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
index a8abffe..671f193 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
@@ -99,14 +99,52 @@ NotImplementedException: MT_DOP not supported for plans with base table joins or
 insert into functional_parquet.alltypes partition(year,month)
 select * from functional_parquet.alltypessmall
 ---- PLAN
-NotImplementedException: MT_DOP not supported for plans with base table joins or table sinks.
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=1.01GB mem-reservation=12.09MB thread-reservation=1
+WRITE TO HDFS [functional_parquet.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+|  partitions=4
+|  output exprs: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col, year, month
+|  mem-estimate=1.00GB mem-reservation=0B thread-reservation=0
+|
+01:SORT
+|  order by: year ASC NULLS LAST, month ASC NULLS LAST
+|  mem-estimate=12.00MB mem-reservation=12.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=2 row-size=80B cardinality=unavailable
+|  in pipelines: 01(GETNEXT), 00(OPEN)
+|
+00:SCAN HDFS [functional_parquet.alltypessmall]
+   HDFS partitions=4/4 files=4 size=14.51KB
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/4 rows=unavailable
+     columns missing stats: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   mem-estimate=16.00MB mem-reservation=88.00KB thread-reservation=0
+   tuple-ids=0 row-size=80B cardinality=unavailable
+   in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
 NotImplementedException: MT_DOP not supported for plans with base table joins or table sinks.
 ====
 # CTAS not allowed.
 create table ctas_mt_dop_test as select * from functional_parquet.alltypes
 ---- PLAN
-NotImplementedException: MT_DOP not supported for plans with base table joins or table sinks.
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=16.10MB mem-reservation=88.00KB thread-reservation=1
+WRITE TO HDFS [default.ctas_mt_dop_test, OVERWRITE=false]
+|  partitions=1
+|  output exprs: functional_parquet.alltypes.id, functional_parquet.alltypes.bool_col, functional_parquet.alltypes.tinyint_col, functional_parquet.alltypes.smallint_col, functional_parquet.alltypes.int_col, functional_parquet.alltypes.bigint_col, functional_parquet.alltypes.float_col, functional_parquet.alltypes.double_col, functional_parquet.alltypes.date_string_col, functional_parquet.alltypes.string_col, functional_parquet.alltypes.timestamp_col, functional_parquet.alltypes.year, func [...]
+|  mem-estimate=100.00KB mem-reservation=0B thread-reservation=0
+|
+00:SCAN HDFS [functional_parquet.alltypes]
+   HDFS partitions=24/24 files=24 size=200.25KB
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/24 rows=unavailable
+     columns missing stats: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   mem-estimate=16.00MB mem-reservation=88.00KB thread-reservation=0
+   tuple-ids=0 row-size=80B cardinality=unavailable
+   in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
 NotImplementedException: MT_DOP not supported for plans with base table joins or table sinks.
 ====
@@ -137,7 +175,7 @@ PLAN-ROOT SINK
 |  in pipelines: 01(GETNEXT), 00(OPEN)
 |
 00:SCAN HDFS [functional_parquet.alltypes]
-   HDFS partitions=24/24 files=24 size=200.33KB
+   HDFS partitions=24/24 files=24 size=200.25KB
    predicates: id < CAST(10 AS INT)
    stored statistics:
      table: rows=unavailable size=unavailable
@@ -193,7 +231,7 @@ Per-Host Resources: mem-estimate=432.00MB mem-reservation=102.07MB thread-reserv
 |  in pipelines: 00(GETNEXT)
 |
 00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
-   HDFS partitions=24/24 files=24 size=200.33KB
+   HDFS partitions=24/24 files=24 size=200.25KB
    predicates: id < CAST(10 AS INT)
    stored statistics:
      table: rows=unavailable size=unavailable
@@ -233,7 +271,7 @@ PLAN-ROOT SINK
 |  in pipelines: 01(GETNEXT), 00(OPEN)
 |
 00:SCAN HDFS [functional_parquet.alltypes]
-   HDFS partitions=24/24 files=24 size=200.33KB
+   HDFS partitions=24/24 files=24 size=200.25KB
    predicates: id < CAST(10 AS INT)
    stored statistics:
      table: rows=unavailable size=unavailable
@@ -282,7 +320,7 @@ Per-Host Resources: mem-estimate=30.32MB mem-reservation=30.00MB thread-reservat
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=9
 Per-Host Resources: mem-estimate=48.00MB mem-reservation=48.00KB thread-reservation=3
 00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
-   HDFS partitions=24/24 files=24 size=200.33KB
+   HDFS partitions=24/24 files=24 size=200.25KB
    predicates: id < CAST(10 AS INT)
    stored statistics:
      table: rows=unavailable size=unavailable
@@ -351,12 +389,12 @@ PLAN-ROOT SINK
 |     in pipelines: 00(GETNEXT)
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c]
-   HDFS partitions=1/1 files=4 size=288.99MB
+   HDFS partitions=1/1 files=4 size=288.98MB
    predicates: c_custkey < CAST(10 AS BIGINT), !empty(c.c_orders)
    predicates on o: !empty(o.o_lineitems), o_orderkey < CAST(5 AS BIGINT)
    predicates on o_lineitems: l_linenumber < CAST(3 AS INT)
    stored statistics:
-     table: rows=150.00K size=288.99MB
+     table: rows=150.00K size=288.98MB
      columns missing stats: c_orders
    extrapolated-rows=disabled max-scan-range-rows=50.12K
    parquet statistics predicates: c_custkey < CAST(10 AS BIGINT)
@@ -427,12 +465,12 @@ Per-Host Resources: mem-estimate=312.00MB mem-reservation=312.00MB thread-reserv
 |     in pipelines: 00(GETNEXT)
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c, RANDOM]
-   HDFS partitions=1/1 files=4 size=288.99MB
+   HDFS partitions=1/1 files=4 size=288.98MB
    predicates: c_custkey < CAST(10 AS BIGINT), !empty(c.c_orders)
    predicates on o: !empty(o.o_lineitems), o_orderkey < CAST(5 AS BIGINT)
    predicates on o_lineitems: l_linenumber < CAST(3 AS INT)
    stored statistics:
-     table: rows=150.00K size=288.99MB
+     table: rows=150.00K size=288.98MB
      columns missing stats: c_orders
    extrapolated-rows=disabled max-scan-range-rows=50.12K
    parquet statistics predicates: c_custkey < CAST(10 AS BIGINT)
@@ -492,11 +530,11 @@ PLAN-ROOT SINK
 |     in pipelines: 00(GETNEXT)
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c]
-   HDFS partitions=1/1 files=4 size=288.99MB
+   HDFS partitions=1/1 files=4 size=288.98MB
    predicates: !empty(c.c_orders), !empty(c.c_orders)
    predicates on o1: o1.o_orderkey < CAST(5 AS BIGINT)
    stored statistics:
-     table: rows=150.00K size=288.99MB
+     table: rows=150.00K size=288.98MB
      columns missing stats: c_orders, c_orders
    extrapolated-rows=disabled max-scan-range-rows=50.12K
    parquet statistics predicates on o1: o1.o_orderkey < CAST(5 AS BIGINT)
@@ -554,11 +592,11 @@ Per-Host Resources: mem-estimate=269.81MB mem-reservation=53.81MB thread-reserva
 |     in pipelines: 00(GETNEXT)
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c, RANDOM]
-   HDFS partitions=1/1 files=4 size=288.99MB
+   HDFS partitions=1/1 files=4 size=288.98MB
    predicates: !empty(c.c_orders), !empty(c.c_orders)
    predicates on o1: o1.o_orderkey < CAST(5 AS BIGINT)
    stored statistics:
-     table: rows=150.00K size=288.99MB
+     table: rows=150.00K size=288.98MB
      columns missing stats: c_orders, c_orders
    extrapolated-rows=disabled max-scan-range-rows=50.12K
    parquet statistics predicates on o1: o1.o_orderkey < CAST(5 AS BIGINT)
diff --git a/testdata/workloads/functional-query/queries/QueryTest/mt-dop-auto-fallback.test b/testdata/workloads/functional-query/queries/QueryTest/mt-dop-auto-fallback.test
new file mode 100644
index 0000000..cb0a661
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/mt-dop-auto-fallback.test
@@ -0,0 +1,33 @@
+====
+---- QUERY
+select min(l_orderkey), min(p_name)
+from tpch.lineitem join tpch.part on l_partkey = p_partkey;
+---- RESULTS
+1,'almond antique blue royal burnished'
+---- TYPES
+BIGINT,STRING
+---- RUNTIME_PROFILE
+row_regex:.*Query Options \(set by configuration and planner\):.*MT_DOP=0.*
+row_regex:.*All 3 execution backends \(5 fragment instances\) started.*
+row_regex:.*NumScannerThreadsStarted.*
+====
+---- QUERY
+select min(l_orderkey), min(p_name)
+from tpch_kudu.lineitem join tpch_kudu.part on l_partkey = p_partkey;
+---- RESULTS
+1,'almond antique blue royal burnished'
+---- TYPES
+BIGINT,STRING
+---- RUNTIME_PROFILE
+row_regex:.*Query Options \(set by configuration and planner\):.*MT_DOP=0.*
+row_regex:.*All 3 execution backends \(7 fragment instances\) started.*
+row_regex:.*NumScannerThreadsStarted.*
+====
+---- QUERY
+create table tmp as
+select * from functional.alltypes
+---- RUNTIME_PROFILE
+row_regex:.*Query Options \(set by configuration and planner\):.*MT_DOP=0.*
+row_regex:.*All 3 execution backends \(3 fragment instances\) started.*
+row_regex:.*NumScannerThreadsStarted.*
+====
diff --git a/tests/custom_cluster/test_mt_dop.py b/tests/custom_cluster/test_mt_dop.py
index b0eb01b..cc83b56 100644
--- a/tests/custom_cluster/test_mt_dop.py
+++ b/tests/custom_cluster/test_mt_dop.py
@@ -53,8 +53,22 @@ class TestMtDopFlags(CustomClusterTestSuite):
     self.run_test_case('QueryTest/joins_mt_dop', vector,
        test_file_vars={'$RUNTIME_FILTER_WAIT_TIME_MS': str(WAIT_TIME_MS)})
 
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(impalad_args="--mt_dop_auto_fallback=true")
+  def test_mt_dop_fallback(self, vector, unique_database):
+    """Test joins and inserts fall back to non-mt_dop correctly."""
+    vector = deepcopy(vector)
+    vector.get_value('exec_option')['mt_dop'] = 4
+    # Targeted test case that verifies that the fallback actually switches to the
+    # non-mt-dop plans.
+    self.run_test_case('QueryTest/mt-dop-auto-fallback', vector, use_db=unique_database)
+
+    # Check that the join and insert plans work as expected.
+    self.run_test_case('QueryTest/joins', vector, use_db="functional_parquet")
+    self.run_test_case('QueryTest/insert', vector)
+
   @CustomClusterTestSuite.with_args(impalad_args="--unlock_mt_dop=true", cluster_size=1)
-  def test_mt_dop_runtime_filters_one_node(self, vector, unique_database):
+  def test_mt_dop_runtime_filters_one_node(self, vector):
     """Runtime filter tests, which assume 3 fragment instances, can also be run on a single
     node cluster to test multiple filter sources/destinations per backend."""
     vector.get_value('exec_option')['mt_dop'] = 3
diff --git a/tests/query_test/test_cancellation.py b/tests/query_test/test_cancellation.py
index c6570f1..e82b864 100644
--- a/tests/query_test/test_cancellation.py
+++ b/tests/query_test/test_cancellation.py
@@ -160,9 +160,10 @@ class TestCancellation(ImpalaTestSuite):
     if not debug_action and ('count' in query or 'limit' in query):
       self.execute_query(query, vector.get_value('exec_option'))
 
+  @pytest.mark.execute_serially
   def test_misformatted_profile_text(self):
     """Tests that canceled queries have no whitespace formatting errors in their profiles
-    (IMPALA-2063)."""
+    (IMPALA-2063). Executes serially because it is timing-dependent and can be flaky."""
     query = "select count(*) from functional_parquet.alltypes where bool_col = sleep(100)"
     client = self.hs2_client
     # Start query