You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2020/08/04 07:20:04 UTC

[impala] 02/02: IMPALA-8125: Add query option to limit number of hdfs writer instances

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 0a13029afccb374763fce7a916ef9da257a8bcde
Author: Bikramjeet Vig <bi...@gmail.com>
AuthorDate: Wed Jul 8 15:55:54 2020 -0700

    IMPALA-8125: Add query option to limit number of hdfs writer instances
    
    This patch adds a new query option MAX_FS_WRITERS that limits the
    number of HDFS writer instances.
    
    Highlights:
    - Depending on the plan, it either restricts the num of instances of
      the root fragment or adds an exchange and then limits the num of
      instances of that.
    - Assigns instances evenly across available backends.
    - "no-shuffle" query hint is ignored when using query option.
    - Change in behavior of plans is only when this query option is used.
    - The only exception to the previous point is that the optimization
      logic that decides to add an exchange now looks at the num of
      instances instead of the number of nodes.
    
    Limitation:
    A mismatch of cluster state during query planning and scheduling can
    result in more or less fragment instances to be scheduled than
    expected. Eg. If max_fs_writers in 2 and the planner sees only 2
    executors then it might not add an exchange between a scan node and
    the table sink, but during scheduling if there are 3 nodes then that
    scan+tablesink instance will be scheduled on 3 backends.
    
    Testing:
    - Added planner tests to cover all cases where this enforcement kicks
      in and to highlight the behavior.
    - Added e2e tests to confirm that the scheduler is enforcing the limit
      and distributing the instance evenly across backends for different
      plan shapes.
    
    Change-Id: I17c8e61b9a32d908eec82c83618ff9caa41078a5
    Reviewed-on: http://gerrit.cloudera.org:8080/16204
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/scheduling/scheduler.cc                     |  52 +-
 be/src/scheduling/scheduler.h                      |   4 +
 be/src/service/query-options-test.cc               |   1 +
 be/src/service/query-options.cc                    |  12 +
 be/src/service/query-options.h                     |   3 +-
 common/thrift/ImpalaInternalService.thrift         |   3 +
 common/thrift/ImpalaService.thrift                 |   4 +
 .../impala/analysis/CreateTableAsSelectStmt.java   |   3 +
 .../org/apache/impala/analysis/InsertStmt.java     |   9 +-
 .../apache/impala/planner/DistributedPlanner.java  |  74 ++-
 .../org/apache/impala/planner/HdfsTableSink.java   |  39 +-
 .../org/apache/impala/planner/PlanFragment.java    |   4 +
 .../java/org/apache/impala/planner/TableSink.java  |  10 +-
 .../org/apache/impala/planner/PlannerTest.java     |  12 +
 .../PlannerTest/insert-hdfs-writer-limit.test      | 630 +++++++++++++++++++++
 tests/query_test/test_insert.py                    |  77 +++
 16 files changed, 903 insertions(+), 34 deletions(-)

diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index abd28ec..ec79bd5 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -480,6 +480,15 @@ void Scheduler::CreateCollocatedAndScanInstances(const ExecutorConfig& executor_
       }
     }
   }
+  if (fragment.output_sink.__isset.table_sink
+      && fragment.output_sink.table_sink.__isset.hdfs_table_sink
+      && state->query_options().max_fs_writers > 0
+      && fragment_state->instance_states.size() > state->query_options().max_fs_writers) {
+    LOG(WARNING) << "Extra table sink instances scheduled, probably due to mismatch of "
+                    "cluster state during planning vs scheduling. Expected: "
+                 << state->query_options().max_fs_writers
+                 << " Found: " << fragment_state->instance_states.size();
+  }
 }
 
 vector<vector<ScanRangeParamsPB>> Scheduler::AssignRangesToInstances(
@@ -503,15 +512,46 @@ vector<vector<ScanRangeParamsPB>> Scheduler::AssignRangesToInstances(
 void Scheduler::CreateInputCollocatedInstances(
     FragmentScheduleState* fragment_state, ScheduleState* state) {
   DCHECK_GE(fragment_state->exchange_input_fragments.size(), 1);
+  const TPlanFragment& fragment = fragment_state->fragment;
   const FragmentScheduleState& input_fragment_state =
       *state->GetFragmentScheduleState(fragment_state->exchange_input_fragments[0]);
   int per_fragment_instance_idx = 0;
-  for (const FInstanceScheduleState& input_instance_state :
-      input_fragment_state.instance_states) {
-    UniqueIdPB instance_id = state->GetNextInstanceId();
-    fragment_state->instance_states.emplace_back(instance_id, input_instance_state.host,
-        input_instance_state.krpc_host, per_fragment_instance_idx++, *fragment_state);
-    *fragment_state->exec_params->add_instances() = instance_id;
+
+  if (fragment.output_sink.__isset.table_sink
+      && fragment.output_sink.table_sink.__isset.hdfs_table_sink
+      && state->query_options().max_fs_writers > 0
+      && input_fragment_state.instance_states.size()
+          > state->query_options().max_fs_writers) {
+    std::unordered_set<std::pair<NetworkAddressPB, NetworkAddressPB>> all_hosts;
+    for (const FInstanceScheduleState& input_instance_state :
+        input_fragment_state.instance_states) {
+      all_hosts.insert({input_instance_state.host, input_instance_state.krpc_host});
+    }
+    // This implementation creates the desired number of instances while balancing them
+    // across hosts and ensuring that instances on the same host get consecutive instance
+    // indexes.
+    int num_hosts = all_hosts.size();
+    int max_instances = state->query_options().max_fs_writers;
+    int instances_per_host = max_instances / num_hosts;
+    int remainder = max_instances % num_hosts;
+    auto host_itr = all_hosts.begin();
+    for (int i = 0; i < num_hosts; i++) {
+      for (int j = 0; j < instances_per_host + (i < remainder); ++j) {
+        UniqueIdPB instance_id = state->GetNextInstanceId();
+        fragment_state->instance_states.emplace_back(instance_id, host_itr->first,
+            host_itr->second, per_fragment_instance_idx++, *fragment_state);
+        *fragment_state->exec_params->add_instances() = instance_id;
+      }
+      if (host_itr != all_hosts.end()) host_itr++;
+    }
+  } else {
+    for (const FInstanceScheduleState& input_instance_state :
+        input_fragment_state.instance_states) {
+      UniqueIdPB instance_id = state->GetNextInstanceId();
+      fragment_state->instance_states.emplace_back(instance_id, input_instance_state.host,
+          input_instance_state.krpc_host, per_fragment_instance_idx++, *fragment_state);
+      *fragment_state->exec_params->add_instances() = instance_id;
+    }
   }
 }
 
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index d9abb03..ccf8948 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -398,6 +398,10 @@ class Scheduler {
 
   /// For each instance of fragment_state's input fragment, create a collocated
   /// instance for fragment_state's fragment.
+  /// Also enforces an upper limit on the number of instances in case this fragment_state
+  /// has an HDFS table writer and the MAX_FS_WRITERS query option is non zero. This
+  /// upper limit is enforced by doing a round robin assignment of instances among all the
+  /// hosts that would run the input fragment.
   /// Expects that fragment_state only has a single input fragment.
   void CreateInputCollocatedInstances(
       FragmentScheduleState* fragment_state, ScheduleState* state);
diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc
index e9bc9ba..180ca50 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -254,6 +254,7 @@ TEST(QueryOptions, SetIntOptions) {
       {MAKE_OPTIONDEF(statement_expression_limit),
           {MIN_STATEMENT_EXPRESSION_LIMIT, I32_MAX}},
       {MAKE_OPTIONDEF(max_cnf_exprs),                  {-1, I32_MAX}},
+      {MAKE_OPTIONDEF(max_fs_writers),                 {0, I32_MAX}},
   };
   for (const auto& test_case : case_set) {
     const OptionDef<int32_t>& option_def = test_case.first;
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 88cf84e..33e6d70 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -922,6 +922,18 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_sort_run_bytes_limit(sort_run_bytes_limit);
         break;
       }
+      case TImpalaQueryOptions::MAX_FS_WRITERS: {
+        StringParser::ParseResult result;
+        const int32_t max_fs_writers =
+            StringParser::StringToInt<int32_t>(value.c_str(), value.length(), &result);
+        if (result != StringParser::PARSE_SUCCESS || max_fs_writers < 0) {
+          return Status(Substitute("$0 is not valid for MAX_FS_WRITERS. Only "
+                                   "non-negative numbers are allowed.",
+              value));
+        }
+        query_options->__set_max_fs_writers(max_fs_writers);
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index dcde91d..82cc400 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -47,7 +47,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::SORT_RUN_BYTES_LIMIT + 1);\
+      TImpalaQueryOptions::MAX_FS_WRITERS + 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -206,6 +206,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
   QUERY_OPT_FN(enable_distinct_semi_join_optimization,\
       ENABLE_DISTINCT_SEMI_JOIN_OPTIMIZATION, TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(sort_run_bytes_limit, SORT_RUN_BYTES_LIMIT, TQueryOptionLevel::ADVANCED)\
+  QUERY_OPT_FN(max_fs_writers, MAX_FS_WRITERS, TQueryOptionLevel::ADVANCED)\
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 5bbb713..64b621a 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -435,6 +435,9 @@ struct TQueryOptions {
 
   // See comment in ImpalaService.thrift
   107: optional i64 sort_run_bytes_limit = -1;
+
+  // See comment in ImpalaService.thrift
+  108: optional i32 max_fs_writers = 0;
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 5b1857c..2a2ed97 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -547,6 +547,10 @@ enum TImpalaQueryOptions {
   // The max reservation that sorter will use for intermediate sort runs.
   // 0 or -1 means this has no effect.
   SORT_RUN_BYTES_LIMIT = 106
+
+  // Sets an upper limit on the number of fs writer instances to be scheduled during
+  // insert. Currently this limit only applies to HDFS inserts.
+  MAX_FS_WRITERS = 107
 }
 
 // The summary of a DML statement.
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
index 643caa8..7dc4639 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
@@ -230,6 +230,9 @@ public class CreateTableAsSelectStmt extends StatementBase {
           (tmpTable instanceof FeFsTable || tmpTable instanceof FeKuduTable));
 
       insertStmt_.setTargetTable(tmpTable);
+      if (tmpTable instanceof FeFsTable) {
+        insertStmt_.setMaxTableSinks(analyzer_.getQueryOptions().getMax_fs_writers());
+      }
     } catch (Exception e) {
       throw new AnalysisException(e.getMessage(), e);
     }
diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
index 8d24187..2228726 100644
--- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
@@ -117,6 +117,10 @@ public class InsertStmt extends StatementBase {
   // Set in analyze(). Contains metadata of target table to determine type of sink.
   private FeTable table_;
 
+  // Set in analyze(). Set the limit on the maximum number of table sink instances.
+  // A value of 0 means no limit.
+  private int maxTableSinks_ = 0;
+
   // Set in analyze(). Exprs correspond to the partitionKeyValues, if specified, or to
   // the partition columns for Kudu tables.
   private List<Expr> partitionKeyExprs_ = new ArrayList<>();
@@ -496,6 +500,7 @@ public class InsertStmt extends StatementBase {
     }
 
     if (table_ instanceof FeFsTable) {
+      setMaxTableSinks(analyzer_.getQueryOptions().getMax_fs_writers());
       FeFsTable fsTable = (FeFsTable) table_;
       StringBuilder error = new StringBuilder();
       fsTable.parseSkipHeaderLineCount(error);
@@ -922,6 +927,7 @@ public class InsertStmt extends StatementBase {
   public TableName getTargetTableName() { return targetTableName_; }
   public FeTable getTargetTable() { return table_; }
   public void setTargetTable(FeTable table) { this.table_ = table; }
+  public void setMaxTableSinks(int maxTableSinks) { this.maxTableSinks_ = maxTableSinks; }
   public void setWriteId(long writeId) { this.writeId_ = writeId; }
   public boolean isOverwrite() { return overwrite_; }
   public TSortingOrder getSortingOrder() { return sortingOrder_; }
@@ -959,7 +965,8 @@ public class InsertStmt extends StatementBase {
     Preconditions.checkState(table_ != null);
     return TableSink.create(table_, isUpsert_ ? TableSink.Op.UPSERT : TableSink.Op.INSERT,
         partitionKeyExprs_, resultExprs_, mentionedColumns_, overwrite_,
-        requiresClustering(), new Pair<>(sortColumns_, sortingOrder_), writeId_);
+        requiresClustering(), new Pair<>(sortColumns_, sortingOrder_), writeId_,
+        maxTableSinks_);
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
index 7496bd1..864a1c1 100644
--- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
@@ -27,15 +27,18 @@ import org.apache.impala.analysis.InsertStmt;
 import org.apache.impala.analysis.JoinOperator;
 import org.apache.impala.analysis.MultiAggregateInfo.AggPhase;
 import org.apache.impala.analysis.QueryStmt;
+import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.planner.JoinNode.DistributionMode;
+import org.apache.impala.thrift.TPartitionType;
 import org.apache.impala.util.KuduUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Predicates;
 import com.google.common.collect.Lists;
 
 /**
@@ -174,22 +177,24 @@ public class DistributedPlanner {
   }
 
   /**
-   * Decides whether to repartition the output of 'inputFragment' before feeding its
-   * data into the table sink of the given 'insertStmt'. The decision obeys the
-   * shuffle/noshuffle plan hints if present. Otherwise, returns a plan fragment that
-   * partitions the output of 'inputFragment' on the partition exprs of 'insertStmt',
-   * unless the expected number of partitions is less than the number of nodes on which
-   * inputFragment runs, or the target table is unpartitioned.
-   * For inserts into unpartitioned tables or inserts with only constant partition exprs,
-   * the shuffle hint leads to a plan that merges all rows at the coordinator where
-   * the table sink is executed.
-   * If this functions ends up creating a new fragment, appends that to 'fragments'.
+   * Decides whether to repartition the output of 'inputFragment' before feeding
+   * its data into the table sink of the given 'insertStmt'. The decision obeys
+   * the shuffle/noshuffle plan hints if present unless MAX_FS_WRITERS query
+   * option is used where the noshuffle hint is ignored. The decision is based on
+   * a number of factors including, whether the target table is partitioned or
+   * unpartitioned, the input fragment and the target table's partition
+   * expressions, expected number of output partitions, num of nodes on which the
+   * input partition will run, whether MAX_FS_WRITERS query option is used. If
+   * this functions ends up creating a new fragment, appends that to 'fragments'.
    */
   public PlanFragment createInsertFragment(
       PlanFragment inputFragment, InsertStmt insertStmt, Analyzer analyzer,
       List<PlanFragment> fragments)
       throws ImpalaException {
-    if (insertStmt.hasNoShuffleHint()) return inputFragment;
+    boolean enforce_hdfs_writer_limit = insertStmt.getTargetTable() instanceof FeFsTable
+        && analyzer.getQueryOptions().getMax_fs_writers() > 0;
+
+    if (insertStmt.hasNoShuffleHint() && !enforce_hdfs_writer_limit) return inputFragment;
 
     List<Expr> partitionExprs = Lists.newArrayList(insertStmt.getPartitionKeyExprs());
     // Ignore constants for the sake of partitioning.
@@ -200,11 +205,22 @@ public class DistributedPlanner {
     DataPartition inputPartition = inputFragment.getDataPartition();
     if (!partitionExprs.isEmpty()
         && analyzer.setsHaveValueTransfer(inputPartition.getPartitionExprs(),
-        partitionExprs, true)
-        && !(insertStmt.getTargetTable() instanceof FeKuduTable)) {
+            partitionExprs, true)
+        && !(insertStmt.getTargetTable() instanceof FeKuduTable)
+        && !enforce_hdfs_writer_limit) {
       return inputFragment;
     }
 
+    int maxHdfsWriters = analyzer.getQueryOptions().getMax_fs_writers();
+    // We also consider fragments containing union nodes along with scan fragments
+    // (leaf fragments) since they are either a part of those scan fragments or are
+    // co-located with them to maintain parallelism.
+    List<ScanNode> hdfsScanORUnionNodes = Lists.newArrayList();
+    inputFragment.collectPlanNodes(Predicates.instanceOf(HdfsScanNode.class),
+        hdfsScanORUnionNodes);
+    inputFragment.collectPlanNodes(Predicates.instanceOf(UnionNode.class),
+        hdfsScanORUnionNodes);
+
     // Make a cost-based decision only if no user hint was supplied.
     if (!insertStmt.hasShuffleHint()) {
       if (insertStmt.getTargetTable() instanceof FeKuduTable) {
@@ -213,12 +229,27 @@ public class DistributedPlanner {
         // TODO: make a more sophisticated decision here for partitioned tables and when
         // we have info about tablet locations.
         if (partitionExprs.isEmpty()) return inputFragment;
-      } else {
+      } else if (!enforce_hdfs_writer_limit || hdfsScanORUnionNodes.size() == 0
+          || inputFragment.getNumInstances() <= maxHdfsWriters) {
+        // Only consider skipping the addition of an exchange node if
+        // 1. The hdfs writer limit does not apply
+        // 2. Writer limit applies and there are no hdfs scan or union nodes. In this
+        //    case we will restrict the number of instances of this internal fragment.
+        // 3. Writer limit applies and there is a scan node or union node, but its num
+        //    of instances are already under the writer limit.
+        // Basically covering all cases where we don't mind restricting the parallelism
+        // of their instances.
+        int input_instances = inputFragment.getNumInstances();
+        if (enforce_hdfs_writer_limit && hdfsScanORUnionNodes.size() == 0) {
+          // For an internal fragment we enforce an upper limit based on the
+          // MAX_FS_WRITER query option.
+          input_instances = Math.min(input_instances, maxHdfsWriters);
+        }
         // If the existing partition exprs are a subset of the table partition exprs,
         // check if it is distributed across all nodes. If so, don't repartition.
         if (Expr.isSubset(inputPartition.getPartitionExprs(), partitionExprs)) {
           long numPartitions = getNumDistinctValues(inputPartition.getPartitionExprs());
-          if (numPartitions >= inputFragment.getNumNodes()) {
+          if (numPartitions >= input_instances) {
             return inputFragment;
           }
         }
@@ -231,8 +262,8 @@ public class DistributedPlanner {
         // size in the particular file format of the output table/partition.
         // We should always know on how many nodes our input is running.
         long numPartitions = getNumDistinctValues(partitionExprs);
-        Preconditions.checkState(inputFragment.getNumNodes() != -1);
-        if (numPartitions > 0 && numPartitions <= inputFragment.getNumNodes()) {
+        Preconditions.checkState(inputFragment.getNumInstances() != -1);
+        if (numPartitions > 0 && numPartitions <= input_instances) {
           return inputFragment;
         }
       }
@@ -244,7 +275,14 @@ public class DistributedPlanner {
     Preconditions.checkState(exchNode.hasValidStats());
     DataPartition partition;
     if (partitionExprs.isEmpty()) {
-      partition = DataPartition.UNPARTITIONED;
+      if (enforce_hdfs_writer_limit
+          && inputFragment.getDataPartition().getType() == TPartitionType.RANDOM) {
+        // This ensures the parallelism of the writers is maintained while maintaining
+        // legacy behavior(when not using MAX_FS_WRITER query option).
+        partition = DataPartition.RANDOM;
+      } else {
+        partition = DataPartition.UNPARTITIONED;
+      }
     } else if (insertStmt.getTargetTable() instanceof FeKuduTable) {
       partition = DataPartition.kuduPartitioned(
           KuduUtil.createPartitionExpr(insertStmt, ctx_.getRootAnalyzer()));
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
index af67dc9..8179349 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
@@ -75,10 +75,14 @@ public class HdfsTableSink extends TableSink {
   // Stores the allocated write id if the target table is transactional, otherwise -1.
   private long writeId_;
 
+  // Set the limit on the maximum number of hdfs table sink instances.
+  // A value of 0 means no limit.
+  private int maxHdfsSinks_;
+
   public HdfsTableSink(FeTable targetTable, List<Expr> partitionKeyExprs,
-      List<Expr> outputExprs,
-      boolean overwrite, boolean inputIsClustered,
-      Pair<List<Integer>, TSortingOrder> sortProperties, long writeId) {
+      List<Expr> outputExprs, boolean overwrite, boolean inputIsClustered,
+      Pair<List<Integer>, TSortingOrder> sortProperties, long writeId,
+      int maxTableSinks) {
     super(targetTable, Op.INSERT, outputExprs);
     Preconditions.checkState(targetTable instanceof FeFsTable);
     partitionKeyExprs_ = partitionKeyExprs;
@@ -87,6 +91,7 @@ public class HdfsTableSink extends TableSink {
     sortColumns_ = sortProperties.first;
     sortingOrder_ = sortProperties.second;
     writeId_ = writeId;
+    maxHdfsSinks_ = maxTableSinks;
   }
 
   @Override
@@ -228,4 +233,32 @@ public class HdfsTableSink extends TableSink {
     // Avoid adding any partition exprs redundantly.
     exprs.addAll(outputExprs_.subList(0, targetTable_.getNonClusteringColumns().size()));
   }
+
+  /**
+   * Return an estimate of the number of nodes the fragment with this sink will
+   * run on. This is based on the number of nodes set for the plan root and has an
+   * upper limit set by the MAX_HDFS_WRITER query option.
+   */
+  public int getNumNodes() {
+    int num_nodes = getFragment().getPlanRoot().getNumNodes();
+    if (maxHdfsSinks_ > 0) {
+      // If there are more nodes than instances where the fragment was initially
+      // planned to run then, then the instances will be distributed evenly across them.
+      num_nodes = Math.min(num_nodes, getNumInstances());
+    }
+    return num_nodes;
+  }
+
+  /**
+   * Return an estimate of the number of instances the fragment with this sink
+   * will run on. This is based on the number of instances set for the plan root
+   * and has an upper limit set by the MAX_HDFS_WRITER query option.
+   */
+  public int getNumInstances() {
+    int num_instances = getFragment().getPlanRoot().getNumInstances();
+    if (maxHdfsSinks_ > 0) {
+      num_instances =  Math.min(num_instances, maxHdfsSinks_);
+    }
+    return num_instances;
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
index b46518c..c49695a 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
@@ -372,6 +372,8 @@ public class PlanFragment extends TreeNode<PlanFragment> {
       // the destination join node. ParallelPlanner sets the destination fragment when
       // adding the JoinBuildSink.
       return ((JoinBuildSink)sink_).getNumNodes();
+    } else if (sink_ instanceof HdfsTableSink) {
+      return ((HdfsTableSink)sink_).getNumNodes();
     } else {
       return planRoot_.getNumNodes();
     }
@@ -402,6 +404,8 @@ public class PlanFragment extends TreeNode<PlanFragment> {
       // join. ParallelPlanner sets the destination fragment when adding the
       // JoinBuildSink.
       return ((JoinBuildSink)sink_).getNumInstances();
+    } else if (sink_ instanceof HdfsTableSink) {
+      return ((HdfsTableSink)sink_).getNumInstances();
     } else {
       return planRoot_.getNumInstances();
     }
diff --git a/fe/src/main/java/org/apache/impala/planner/TableSink.java b/fe/src/main/java/org/apache/impala/planner/TableSink.java
index 6b23ef0..f7038fb 100644
--- a/fe/src/main/java/org/apache/impala/planner/TableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/TableSink.java
@@ -102,7 +102,7 @@ public abstract class TableSink extends DataSink {
       List<Integer> referencedColumns, boolean overwrite,
       boolean inputIsClustered, Pair<List<Integer>, TSortingOrder> sortProperties) {
     return create(table, sinkAction, partitionKeyExprs, outputExprs, referencedColumns,
-        overwrite, inputIsClustered, sortProperties, -1);
+        overwrite, inputIsClustered, sortProperties, -1, 0);
   }
 
   /**
@@ -110,9 +110,9 @@ public abstract class TableSink extends DataSink {
    */
   public static TableSink create(FeTable table, Op sinkAction,
       List<Expr> partitionKeyExprs, List<Expr> outputExprs,
-      List<Integer> referencedColumns,
-      boolean overwrite, boolean inputIsClustered,
-      Pair<List<Integer>, TSortingOrder> sortProperties, long writeId) {
+      List<Integer> referencedColumns, boolean overwrite, boolean inputIsClustered,
+      Pair<List<Integer>, TSortingOrder> sortProperties, long writeId,
+      int maxTableSinks) {
     Preconditions.checkNotNull(partitionKeyExprs);
     Preconditions.checkNotNull(referencedColumns);
     Preconditions.checkNotNull(sortProperties.first);
@@ -122,7 +122,7 @@ public abstract class TableSink extends DataSink {
       // Referenced columns don't make sense for an Hdfs table.
       Preconditions.checkState(referencedColumns.isEmpty());
       return new HdfsTableSink(table, partitionKeyExprs,outputExprs, overwrite,
-          inputIsClustered, sortProperties, writeId);
+          inputIsClustered, sortProperties, writeId, maxTableSinks);
     } else if (table instanceof FeHBaseTable) {
       // HBase only supports inserts.
       Preconditions.checkState(sinkAction == Op.INSERT);
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 0e4ed25..f720873 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -317,6 +317,18 @@ public class PlannerTest extends PlannerTestBase {
   }
 
   @Test
+  public void testHdfsInsertWriterLimit() {
+    addTestDb("test_hdfs_insert_writer_limit",
+        "Test DB for MAX_FS_WRITERS query option.");
+    addTestTable( "create table test_hdfs_insert_writer_limit.partitioned_table "
+        + "(id int) partitioned by (year int, month int) location '/'");
+    addTestTable("create table test_hdfs_insert_writer_limit.unpartitioned_table"
+        + " (id int) location '/'");
+    runPlannerTestFile("insert-hdfs-writer-limit", "test_hdfs_insert_writer_limit",
+        ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN));
+  }
+
+  @Test
   public void testHdfs() {
     runPlannerTestFile("hdfs");
   }
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/insert-hdfs-writer-limit.test b/testdata/workloads/functional-planner/queries/PlannerTest/insert-hdfs-writer-limit.test
new file mode 100644
index 0000000..c299d4d
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/insert-hdfs-writer-limit.test
@@ -0,0 +1,630 @@
+# insert into an unpartitioned table. Single writer in the coordinator instance.
+insert into unpartitioned_table select int_col from
+functional_parquet.alltypes limit 10000000;
+---- QUERYOPTIONS
+max_fs_writers=2
+---- DISTRIBUTEDPLAN
+F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=90.69KB mem-reservation=0B thread-reservation=1
+WRITE TO HDFS [test_hdfs_insert_writer_limit.unpartitioned_table, OVERWRITE=false]
+|  partitions=1
+|  output exprs: int_col
+|  mem-estimate=50.02KB mem-reservation=0B thread-reservation=0
+|
+01:EXCHANGE [UNPARTITIONED]
+|  limit: 10000000
+|  mem-estimate=40.67KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=0 row-size=4B cardinality=12.80K
+|  in pipelines: 00(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=2
+00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
+   HDFS partitions=24/24 files=24 size=201.26KB
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/24 rows=unavailable
+     columns: unavailable
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   limit: 10000000
+   mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
+   tuple-ids=0 row-size=4B cardinality=12.80K
+   in pipelines: 00(GETNEXT)
+====
+# insert into a partitioned table. Single writer in the coordinator instance.
+insert into partitioned_table partition(year, month) select int_col, year, month from
+functional_parquet.alltypes limit 10000000;
+---- QUERYOPTIONS
+max_fs_writers=2
+---- DISTRIBUTEDPLAN
+F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=6.10MB mem-reservation=6.00MB thread-reservation=1
+WRITE TO HDFS [test_hdfs_insert_writer_limit.partitioned_table, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+|  partitions=24
+|  output exprs: int_col, year, month
+|  mem-estimate=100.00KB mem-reservation=0B thread-reservation=0
+|
+02:SORT
+|  order by: year ASC NULLS LAST, month ASC NULLS LAST
+|  mem-estimate=6.00MB mem-reservation=6.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=2 row-size=12B cardinality=12.80K
+|  in pipelines: 02(GETNEXT), 00(OPEN)
+|
+01:EXCHANGE [UNPARTITIONED]
+|  limit: 10000000
+|  mem-estimate=98.02KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=0 row-size=12B cardinality=12.80K
+|  in pipelines: 00(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=2
+00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
+   HDFS partitions=24/24 files=24 size=201.26KB
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/24 rows=unavailable
+     columns missing stats: int_col
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   limit: 10000000
+   mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
+   tuple-ids=0 row-size=12B cardinality=12.80K
+   in pipelines: 00(GETNEXT)
+====
+# insert into a partitioned table. Multiple writers, part of the scan instance. Writers under the max limit.
+insert into partitioned_table partition(year, month) select int_col, year, month from
+functional_parquet.alltypes;
+---- QUERYOPTIONS
+max_fs_writers=30
+mt_dop=10
+---- DISTRIBUTEDPLAN
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=24
+|  Per-Instance Resources: mem-estimate=22.00MB mem-reservation=6.02MB thread-reservation=1
+WRITE TO HDFS [test_hdfs_insert_writer_limit.partitioned_table, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+|  partitions=24
+|  output exprs: int_col, year, month
+|  mem-estimate=6.25KB mem-reservation=0B thread-reservation=0
+|
+01:SORT
+|  order by: year ASC NULLS LAST, month ASC NULLS LAST
+|  mem-estimate=6.00MB mem-reservation=6.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=2 row-size=12B cardinality=12.80K
+|  in pipelines: 01(GETNEXT), 00(OPEN)
+|
+00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
+   HDFS partitions=24/24 files=24 size=201.26KB
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/24 rows=unavailable
+     columns missing stats: int_col
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=0
+   tuple-ids=0 row-size=12B cardinality=12.80K
+   in pipelines: 00(GETNEXT)
+====
+# insert into a partitioned table. Multiple writers, part of the scan instance. Writers above the max limit.
+# A random exchange is added to maintain parallelism of writers.
+insert into partitioned_table partition(year, month) select int_col, year, month from
+functional_parquet.alltypes;
+---- QUERYOPTIONS
+max_fs_writers=11
+mt_dop=10
+---- DISTRIBUTEDPLAN
+F01:PLAN FRAGMENT [HASH(`year`,`month`)] hosts=3 instances=11
+|  Per-Instance Resources: mem-estimate=6.42MB mem-reservation=6.00MB thread-reservation=1
+WRITE TO HDFS [test_hdfs_insert_writer_limit.partitioned_table, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+|  partitions=24
+|  output exprs: int_col, year, month
+|  mem-estimate=13.64KB mem-reservation=0B thread-reservation=0
+|
+02:SORT
+|  order by: year ASC NULLS LAST, month ASC NULLS LAST
+|  mem-estimate=6.00MB mem-reservation=6.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=2 row-size=12B cardinality=12.80K
+|  in pipelines: 02(GETNEXT), 00(OPEN)
+|
+01:EXCHANGE [HASH(`year`,`month`)]
+|  mem-estimate=434.02KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=0 row-size=12B cardinality=12.80K
+|  in pipelines: 00(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=24
+Per-Instance Resources: mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
+00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
+   HDFS partitions=24/24 files=24 size=201.26KB
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/24 rows=unavailable
+     columns missing stats: int_col
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=0
+   tuple-ids=0 row-size=12B cardinality=12.80K
+   in pipelines: 00(GETNEXT)
+====
+# insert into an unpartitioned table. Multiple writers, part of the scan instance. Writers under the max limit.
+insert into unpartitioned_table select int_col from functional_parquet.alltypes;
+---- QUERYOPTIONS
+max_fs_writers=4
+---- DISTRIBUTEDPLAN
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+|  Per-Host Resources: mem-estimate=16.02MB mem-reservation=16.00KB thread-reservation=2
+WRITE TO HDFS [test_hdfs_insert_writer_limit.unpartitioned_table, OVERWRITE=false]
+|  partitions=1
+|  output exprs: int_col
+|  mem-estimate=16.67KB mem-reservation=0B thread-reservation=0
+|
+00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
+   HDFS partitions=24/24 files=24 size=201.26KB
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/24 rows=unavailable
+     columns: unavailable
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
+   tuple-ids=0 row-size=4B cardinality=12.80K
+   in pipelines: 00(GETNEXT)
+====
+# insert into an unpartitioned table. Multiple writers, part of the scan instance. Writers above the max limit.
+# A random exchange is added to maintain parallelism of writers.
+insert into unpartitioned_table select int_col from functional_parquet.alltypes;
+---- QUERYOPTIONS
+max_fs_writers=2
+---- DISTRIBUTEDPLAN
+F01:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
+|  Per-Host Resources: mem-estimate=65.68KB mem-reservation=0B thread-reservation=1
+WRITE TO HDFS [test_hdfs_insert_writer_limit.unpartitioned_table, OVERWRITE=false]
+|  partitions=1
+|  output exprs: int_col
+|  mem-estimate=25.01KB mem-reservation=0B thread-reservation=0
+|
+01:EXCHANGE [RANDOM]
+|  mem-estimate=40.67KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=0 row-size=4B cardinality=12.80K
+|  in pipelines: 00(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=2
+00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
+   HDFS partitions=24/24 files=24 size=201.26KB
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/24 rows=unavailable
+     columns: unavailable
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
+   tuple-ids=0 row-size=4B cardinality=12.80K
+   in pipelines: 00(GETNEXT)
+====
+# insert into an unpartitioned table. Multiple writers, part of the scan instance. Writers under the max limit.
+# Same behaviour when using mt_dop.
+insert into unpartitioned_table select int_col from functional_parquet.alltypes;
+---- QUERYOPTIONS
+max_fs_writers=28
+mt_dop=10
+---- DISTRIBUTEDPLAN
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=24
+|  Per-Instance Resources: mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
+WRITE TO HDFS [test_hdfs_insert_writer_limit.unpartitioned_table, OVERWRITE=false]
+|  partitions=1
+|  output exprs: int_col
+|  mem-estimate=2.08KB mem-reservation=0B thread-reservation=0
+|
+00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
+   HDFS partitions=24/24 files=24 size=201.26KB
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/24 rows=unavailable
+     columns: unavailable
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=0
+   tuple-ids=0 row-size=4B cardinality=12.80K
+   in pipelines: 00(GETNEXT)
+====
+# insert into an unpartitioned table. Multiple writers, part of the scan instance. Writers above the max limit.
+# A random exchange is added to maintain parallelism of writers. Also enforced when using mt_dop
+insert into unpartitioned_table select int_col from functional_parquet.alltypes
+---- QUERYOPTIONS
+max_fs_writers=11
+mt_dop=10
+---- DISTRIBUTEDPLAN
+F01:PLAN FRAGMENT [RANDOM] hosts=3 instances=11
+|  Per-Instance Resources: mem-estimate=213.22KB mem-reservation=0B thread-reservation=1
+WRITE TO HDFS [test_hdfs_insert_writer_limit.unpartitioned_table, OVERWRITE=false]
+|  partitions=1
+|  output exprs: int_col
+|  mem-estimate=4.55KB mem-reservation=0B thread-reservation=0
+|
+01:EXCHANGE [RANDOM]
+|  mem-estimate=208.67KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=0 row-size=4B cardinality=12.80K
+|  in pipelines: 00(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=24
+Per-Instance Resources: mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
+00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
+   HDFS partitions=24/24 files=24 size=201.26KB
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/24 rows=unavailable
+     columns: unavailable
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=0
+   tuple-ids=0 row-size=4B cardinality=12.80K
+   in pipelines: 00(GETNEXT)
+====
+# insert into an unpartitioned table. Multiple writers in an internal fragment (w/o scan node).
+# Writers within the max limit.
+insert into unpartitioned_table select int_col from functional_parquet.alltypes group by int_col;
+---- QUERYOPTIONS
+max_fs_writers=4
+---- DISTRIBUTEDPLAN
+F01:PLAN FRAGMENT [HASH(int_col)] hosts=3 instances=3
+|  Per-Host Resources: mem-estimate=128.04MB mem-reservation=34.00MB thread-reservation=1
+WRITE TO HDFS [test_hdfs_insert_writer_limit.unpartitioned_table, OVERWRITE=false]
+|  partitions=1
+|  output exprs: int_col
+|  mem-estimate=16.67KB mem-reservation=0B thread-reservation=0
+|
+03:AGGREGATE [FINALIZE]
+|  group by: int_col
+|  mem-estimate=128.00MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=1 row-size=4B cardinality=12.80K
+|  in pipelines: 03(GETNEXT), 00(OPEN)
+|
+02:EXCHANGE [HASH(int_col)]
+|  mem-estimate=40.67KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=1 row-size=4B cardinality=12.80K
+|  in pipelines: 00(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=144.00MB mem-reservation=34.02MB thread-reservation=2
+01:AGGREGATE [STREAMING]
+|  group by: int_col
+|  mem-estimate=128.00MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=1 row-size=4B cardinality=12.80K
+|  in pipelines: 00(GETNEXT)
+|
+00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
+   HDFS partitions=24/24 files=24 size=201.26KB
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/24 rows=unavailable
+     columns: unavailable
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
+   tuple-ids=0 row-size=4B cardinality=12.80K
+   in pipelines: 00(GETNEXT)
+====
+# insert into an unpartitioned table. Multiple writers in an internal fragment (w/o scan node).
+# Writers over the max limit.
+insert into unpartitioned_table select int_col from functional_parquet.alltypes group by int_col;
+---- QUERYOPTIONS
+max_fs_writers=2
+---- DISTRIBUTEDPLAN
+F01:PLAN FRAGMENT [HASH(int_col)] hosts=2 instances=2
+|  Per-Host Resources: mem-estimate=128.04MB mem-reservation=34.00MB thread-reservation=1
+WRITE TO HDFS [test_hdfs_insert_writer_limit.unpartitioned_table, OVERWRITE=false]
+|  partitions=1
+|  output exprs: int_col
+|  mem-estimate=25.01KB mem-reservation=0B thread-reservation=0
+|
+03:AGGREGATE [FINALIZE]
+|  group by: int_col
+|  mem-estimate=128.00MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=1 row-size=4B cardinality=12.80K
+|  in pipelines: 03(GETNEXT), 00(OPEN)
+|
+02:EXCHANGE [HASH(int_col)]
+|  mem-estimate=40.67KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=1 row-size=4B cardinality=12.80K
+|  in pipelines: 00(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=144.00MB mem-reservation=34.02MB thread-reservation=2
+01:AGGREGATE [STREAMING]
+|  group by: int_col
+|  mem-estimate=128.00MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=1 row-size=4B cardinality=12.80K
+|  in pipelines: 00(GETNEXT)
+|
+00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
+   HDFS partitions=24/24 files=24 size=201.26KB
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/24 rows=unavailable
+     columns: unavailable
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
+   tuple-ids=0 row-size=4B cardinality=12.80K
+   in pipelines: 00(GETNEXT)
+====
+# insert into an unpartitioned table. Multiple writers in an internal fragment (w/o scan node).
+# Writers over the max limit.
+insert into unpartitioned_table select int_col from functional_parquet.alltypes group by int_col;
+---- QUERYOPTIONS
+max_fs_writers=4
+mt_dop=10
+---- DISTRIBUTEDPLAN
+F01:PLAN FRAGMENT [HASH(int_col)] hosts=3 instances=4
+|  Per-Instance Resources: mem-estimate=128.20MB mem-reservation=34.00MB thread-reservation=1
+WRITE TO HDFS [test_hdfs_insert_writer_limit.unpartitioned_table, OVERWRITE=false]
+|  partitions=1
+|  output exprs: int_col
+|  mem-estimate=12.50KB mem-reservation=0B thread-reservation=0
+|
+03:AGGREGATE [FINALIZE]
+|  group by: int_col
+|  mem-estimate=128.00MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=1 row-size=4B cardinality=12.80K
+|  in pipelines: 03(GETNEXT), 00(OPEN)
+|
+02:EXCHANGE [HASH(int_col)]
+|  mem-estimate=208.67KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=1 row-size=4B cardinality=12.80K
+|  in pipelines: 00(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=24
+Per-Instance Resources: mem-estimate=144.00MB mem-reservation=34.02MB thread-reservation=1
+01:AGGREGATE [STREAMING]
+|  group by: int_col
+|  mem-estimate=128.00MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=1 row-size=4B cardinality=12.80K
+|  in pipelines: 00(GETNEXT)
+|
+00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
+   HDFS partitions=24/24 files=24 size=201.26KB
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/24 rows=unavailable
+     columns: unavailable
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=0
+   tuple-ids=0 row-size=4B cardinality=12.80K
+   in pipelines: 00(GETNEXT)
+====
+# insert into a partitioned table. Multiple writers in an internal fragment (w/o scan node).
+# Writers within the max limit.
+insert into partitioned_table partition(year, month) select int_col, year, month from functional_parquet.alltypes;
+---- QUERYOPTIONS
+max_fs_writers=4
+---- DISTRIBUTEDPLAN
+F01:PLAN FRAGMENT [HASH(`year`,`month`)] hosts=3 instances=3
+|  Per-Host Resources: mem-estimate=6.10MB mem-reservation=6.00MB thread-reservation=1
+WRITE TO HDFS [test_hdfs_insert_writer_limit.partitioned_table, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+|  partitions=24
+|  output exprs: int_col, year, month
+|  mem-estimate=50.02KB mem-reservation=0B thread-reservation=0
+|
+02:SORT
+|  order by: year ASC NULLS LAST, month ASC NULLS LAST
+|  mem-estimate=6.00MB mem-reservation=6.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=2 row-size=12B cardinality=12.80K
+|  in pipelines: 02(GETNEXT), 00(OPEN)
+|
+01:EXCHANGE [HASH(`year`,`month`)]
+|  mem-estimate=98.02KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=0 row-size=12B cardinality=12.80K
+|  in pipelines: 00(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=2
+00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
+   HDFS partitions=24/24 files=24 size=201.26KB
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/24 rows=unavailable
+     columns missing stats: int_col
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
+   tuple-ids=0 row-size=12B cardinality=12.80K
+   in pipelines: 00(GETNEXT)
+====
+# insert into a partitioned table. Multiple writers in an internal fragment (w/o scan node).
+# Writers over the max limit.
+insert into partitioned_table partition(year, month) select int_col, year, month from functional_parquet.alltypes;
+---- QUERYOPTIONS
+max_fs_writers=2
+---- DISTRIBUTEDPLAN
+F01:PLAN FRAGMENT [HASH(`year`,`month`)] hosts=2 instances=2
+|  Per-Host Resources: mem-estimate=6.10MB mem-reservation=6.00MB thread-reservation=1
+WRITE TO HDFS [test_hdfs_insert_writer_limit.partitioned_table, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+|  partitions=24
+|  output exprs: int_col, year, month
+|  mem-estimate=75.02KB mem-reservation=0B thread-reservation=0
+|
+02:SORT
+|  order by: year ASC NULLS LAST, month ASC NULLS LAST
+|  mem-estimate=6.00MB mem-reservation=6.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=2 row-size=12B cardinality=12.80K
+|  in pipelines: 02(GETNEXT), 00(OPEN)
+|
+01:EXCHANGE [HASH(`year`,`month`)]
+|  mem-estimate=98.02KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=0 row-size=12B cardinality=12.80K
+|  in pipelines: 00(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=2
+00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
+   HDFS partitions=24/24 files=24 size=201.26KB
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/24 rows=unavailable
+     columns missing stats: int_col
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
+   tuple-ids=0 row-size=12B cardinality=12.80K
+   in pipelines: 00(GETNEXT)
+====
+# insert into an partitioned table. Multiple writers in an internal fragment (w/o scan node).
+# Writers over the max limit.
+insert into partitioned_table partition(year, month) select int_col, year, month from functional_parquet.alltypes;
+---- QUERYOPTIONS
+max_fs_writers=4
+mt_dop=10
+---- DISTRIBUTEDPLAN
+F01:PLAN FRAGMENT [HASH(`year`,`month`)] hosts=3 instances=4
+|  Per-Instance Resources: mem-estimate=6.42MB mem-reservation=6.00MB thread-reservation=1
+WRITE TO HDFS [test_hdfs_insert_writer_limit.partitioned_table, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+|  partitions=24
+|  output exprs: int_col, year, month
+|  mem-estimate=37.51KB mem-reservation=0B thread-reservation=0
+|
+02:SORT
+|  order by: year ASC NULLS LAST, month ASC NULLS LAST
+|  mem-estimate=6.00MB mem-reservation=6.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=2 row-size=12B cardinality=12.80K
+|  in pipelines: 02(GETNEXT), 00(OPEN)
+|
+01:EXCHANGE [HASH(`year`,`month`)]
+|  mem-estimate=434.02KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=0 row-size=12B cardinality=12.80K
+|  in pipelines: 00(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=24
+Per-Instance Resources: mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
+00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
+   HDFS partitions=24/24 files=24 size=201.26KB
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/24 rows=unavailable
+     columns missing stats: int_col
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=0
+   tuple-ids=0 row-size=12B cardinality=12.80K
+   in pipelines: 00(GETNEXT)
+====
+# insert into an partitioned table. "no shuffle" hint disabled when using the query option.
+# Writers over the max limit.
+insert /* +NOSHUFFLE */ into partitioned_table partition(year, month)
+select int_col, year, month from functional_parquet.alltypes;
+---- QUERYOPTIONS
+max_fs_writers=4
+---- DISTRIBUTEDPLAN
+F01:PLAN FRAGMENT [HASH(`year`,`month`)] hosts=3 instances=3
+|  Per-Host Resources: mem-estimate=6.10MB mem-reservation=6.00MB thread-reservation=1
+WRITE TO HDFS [test_hdfs_insert_writer_limit.partitioned_table, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+|  partitions=24
+|  output exprs: int_col, year, month
+|  mem-estimate=50.02KB mem-reservation=0B thread-reservation=0
+|
+02:SORT
+|  order by: year ASC NULLS LAST, month ASC NULLS LAST
+|  mem-estimate=6.00MB mem-reservation=6.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=2 row-size=12B cardinality=12.80K
+|  in pipelines: 02(GETNEXT), 00(OPEN)
+|
+01:EXCHANGE [HASH(`year`,`month`)]
+|  mem-estimate=98.02KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=0 row-size=12B cardinality=12.80K
+|  in pipelines: 00(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=2
+00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
+   HDFS partitions=24/24 files=24 size=201.26KB
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/24 rows=unavailable
+     columns missing stats: int_col
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
+   tuple-ids=0 row-size=12B cardinality=12.80K
+   in pipelines: 00(GETNEXT)
+====
+# "shuffle" hint behaves differently when used with max_fs_writers query option.
+# If the root fragment has a random data partition then a random shuffle exchange
+# instead of an unpartitioned exchange. Instance counter within limit.
+insert /* +SHUFFLE */ into unpartitioned_table select int_col from functional_parquet.alltypes
+---- QUERYOPTIONS
+max_fs_writers=4
+---- DISTRIBUTEDPLAN
+F01:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+|  Per-Host Resources: mem-estimate=57.34KB mem-reservation=0B thread-reservation=1
+WRITE TO HDFS [test_hdfs_insert_writer_limit.unpartitioned_table, OVERWRITE=false]
+|  partitions=1
+|  output exprs: int_col
+|  mem-estimate=16.67KB mem-reservation=0B thread-reservation=0
+|
+01:EXCHANGE [RANDOM]
+|  mem-estimate=40.67KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=0 row-size=4B cardinality=12.80K
+|  in pipelines: 00(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=2
+00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
+   HDFS partitions=24/24 files=24 size=201.26KB
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/24 rows=unavailable
+     columns: unavailable
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
+   tuple-ids=0 row-size=4B cardinality=12.80K
+   in pipelines: 00(GETNEXT)
+====
+# "shuffle" hint behaves differently when used with max_fs_writers query option.
+# If the root fragment has a random data partition then a random shuffle exchange
+# instead of an unpartitioned exchange. Instance counter over limit.
+insert /* +SHUFFLE */ into unpartitioned_table select int_col from functional_parquet.alltypes
+---- QUERYOPTIONS
+max_fs_writers=2
+---- DISTRIBUTEDPLAN
+F01:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
+|  Per-Host Resources: mem-estimate=65.68KB mem-reservation=0B thread-reservation=1
+WRITE TO HDFS [test_hdfs_insert_writer_limit.unpartitioned_table, OVERWRITE=false]
+|  partitions=1
+|  output exprs: int_col
+|  mem-estimate=25.01KB mem-reservation=0B thread-reservation=0
+|
+01:EXCHANGE [RANDOM]
+|  mem-estimate=40.67KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=0 row-size=4B cardinality=12.80K
+|  in pipelines: 00(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=2
+00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
+   HDFS partitions=24/24 files=24 size=201.26KB
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/24 rows=unavailable
+     columns: unavailable
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
+   tuple-ids=0 row-size=4B cardinality=12.80K
+   in pipelines: 00(GETNEXT)
+====
+# "shuffle" hint behaves differently when used with max_fs_writers query option.
+# If the root fragment has a random data partition then a random shuffle exchange
+# is added to maintain parallelism instead of an unpartitioned exchange.
+# Test to show behaviour without using the query option
+insert /* +SHUFFLE */ into unpartitioned_table select int_col from functional_parquet.alltypes
+---- QUERYOPTIONS
+max_fs_writers=0
+---- DISTRIBUTEDPLAN
+F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=90.69KB mem-reservation=0B thread-reservation=1
+WRITE TO HDFS [test_hdfs_insert_writer_limit.unpartitioned_table, OVERWRITE=false]
+|  partitions=1
+|  output exprs: int_col
+|  mem-estimate=50.02KB mem-reservation=0B thread-reservation=0
+|
+01:EXCHANGE [UNPARTITIONED]
+|  mem-estimate=40.67KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=0 row-size=4B cardinality=12.80K
+|  in pipelines: 00(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=2
+00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
+   HDFS partitions=24/24 files=24 size=201.26KB
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/24 rows=unavailable
+     columns: unavailable
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
+   tuple-ids=0 row-size=4B cardinality=12.80K
+   in pipelines: 00(GETNEXT)
+====
diff --git a/tests/query_test/test_insert.py b/tests/query_test/test_insert.py
index 52d4df9..4832664 100644
--- a/tests/query_test/test_insert.py
+++ b/tests/query_test/test_insert.py
@@ -19,6 +19,7 @@
 
 import os
 import pytest
+import re
 
 from testdata.common import widetable
 from tests.common.impala_cluster import ImpalaCluster
@@ -349,3 +350,79 @@ class TestInsertFileExtension(ImpalaTestSuite):
     for path in self.filesystem_client.ls("test-warehouse/{0}.db/{1}".format(
         unique_database, table_name)):
       if not path.startswith('_'): assert path.endswith(file_extension)
+
+
+class TestInsertHdfsWriterLimit(ImpalaTestSuite):
+  """Test to make sure writer fragment instances are distributed evenly when using max
+      hdfs_writers query option."""
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestInsertHdfsWriterLimit, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+        (v.get_value('table_format').file_format == 'parquet'))
+
+  @UniqueDatabase.parametrize(sync_ddl=True)
+  @SkipIfNotHdfsMinicluster.tuned_for_minicluster
+  def test_insert_writer_limit(self, unique_database):
+    # Root internal (non-leaf) fragment.
+    query = "create table {0}.test1 as select int_col from " \
+            "functional_parquet.alltypes".format(unique_database)
+    self.__run_insert_and_verify_instances(query, max_fs_writers=2, mt_dop=0,
+                                           expected_num_instances_per_host=[1, 2, 2])
+    # Root coordinator fragment.
+    query = "create table {0}.test2 as select int_col from " \
+            "functional_parquet.alltypes limit 100000".format(unique_database)
+    self.__run_insert_and_verify_instances(query, max_fs_writers=2, mt_dop=0,
+                                           expected_num_instances_per_host=[1, 1, 2])
+    # Root scan fragment. Instance count within limit.
+    query = "create table {0}.test3 as select int_col from " \
+            "functional_parquet.alltypes".format(unique_database)
+    self.__run_insert_and_verify_instances(query, max_fs_writers=4, mt_dop=0,
+                                           expected_num_instances_per_host=[1, 1, 1])
+
+  @UniqueDatabase.parametrize(sync_ddl=True)
+  @SkipIfNotHdfsMinicluster.tuned_for_minicluster
+  def test_mt_dop_writer_limit(self, unique_database):
+    # Root internal (non-leaf) fragment.
+    query = "create table {0}.test1 as select int_col from " \
+            "functional_parquet.alltypes".format(unique_database)
+    self.__run_insert_and_verify_instances(query, max_fs_writers=11, mt_dop=10,
+                                           expected_num_instances_per_host=[11, 12, 12])
+    # Root coordinator fragment.
+    query = "create table {0}.test2 as select int_col from " \
+            "functional_parquet.alltypes limit 100000".format(unique_database)
+    self.__run_insert_and_verify_instances(query, max_fs_writers=2, mt_dop=10,
+                                           expected_num_instances_per_host=[8, 8, 9])
+    # Root scan fragment. Instance count within limit.
+    query = "create table {0}.test3 as select int_col from " \
+            "functional_parquet.alltypes".format(unique_database)
+    self.__run_insert_and_verify_instances(query, max_fs_writers=30, mt_dop=10,
+                                           expected_num_instances_per_host=[8, 8, 8])
+
+  def __run_insert_and_verify_instances(self, query, max_fs_writers, mt_dop,
+                                        expected_num_instances_per_host):
+    self.client.set_configuration_option("max_fs_writers", max_fs_writers)
+    self.client.set_configuration_option("mt_dop", mt_dop)
+    # Test depends on both planner and scheduler to see the same state of the cluster
+    # having 3 executors, so to reduce flakiness we make sure all 3 executors are up
+    # and running.
+    self.impalad_test_service.wait_for_metric_value("cluster-membership.backends.total",
+                                                    3)
+    result = self.client.execute(query)
+    assert 'HDFS WRITER' in result.exec_summary[0]['operator'], result.runtime_profile
+    assert int(result.exec_summary[0]['num_instances']) <= int(
+      max_fs_writers), result.runtime_profile
+    regex = r'Per Host Number of Fragment Instances' \
+            r':.*?\((.*?)\).*?\((.*?)\).*?\((.*?)\).*?\n'
+    matches = re.findall(regex, result.runtime_profile)
+    assert len(matches) == 1 and len(matches[0]) == 3, result.runtime_profile
+    num_instances_per_host = [int(i) for i in matches[0]]
+    num_instances_per_host.sort()
+    expected_num_instances_per_host.sort()
+    assert num_instances_per_host == expected_num_instances_per_host, \
+      result.runtime_profile
+    self.client.clear_configuration()