You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bi...@apache.org on 2020/06/01 23:52:15 UTC

[impala] branch master updated: IMPALA-9655: Dynamic intra-node load balancing for HDFS scans

This is an automated email from the ASF dual-hosted git repository.

bikram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 052129c  IMPALA-9655: Dynamic intra-node load balancing for HDFS scans
052129c is described below

commit 052129c16a29891a72427b351bcc4e087d772fbe
Author: Bikramjeet Vig <bi...@gmail.com>
AuthorDate: Mon Apr 27 19:14:23 2020 -0700

    IMPALA-9655: Dynamic intra-node load balancing for HDFS scans
    
    This patch ameliorates intra node execution skew for multithreaded
    HDFS scans by implementing a shared queue of scan ranges for all
    instances. Some points to highlight:
    - The scan node's PlanNode will go through all the TScanRanges
      assigned to all instances and aggregate them into file descriptors.
    - These files will be statically and evenly distributed among the
      instances.
    - The instances would then pick up their set of files and issue
      initial ranges by adding them to a shared queue.
    - Instances would then fetch their next range to read from this
      shared queue.
    - Other relevant data structures that will also be shared are:
     * remaining_scan_range_submissions_
     * partition_template_tuple_map_
     * per_file_metadata_
    - Removed the longest-processing time (LPT) algorithm in the scheduler
      which tries to distribute scan load among instances on a host. This
      will have no effect after this patch since now the ranges are shared.
    - Added missing lifecycle events from MT scan nodes.
    
    This approach guarantees the following:
    - All shared data structures are allocated at the FragmentState
      level ensuring they'll stick around till the query is closed.
    - All instance local buffers for reading a scan range will be
      allocated after it has been taken off the shared queue.
    - Since the scheduler can assign ranges from the same file to
      different instances, aggregating them into file descriptors early
      will ensure the initial footer or header range (depending on file
      format) for it will only be issued once.
    
    Limitation:
    - For HDFS MT scans we lose out on the optimization within
      ReaderContext which ensures cached ranges are read first.
    - As a compromise, ranges that are marked to use the hdfs
      cache are added to the front of the shared queue.
    
    Testing:
    - Added a regression test in test_mt_dop which would almost always
      fail without this patch
    - Added some test coverage for mt scans where parquet files are split
      across blocks and scanning tables with mixed file formats.
    - Ran core tests on ASAN build
    - Ran exhaustive tests
    
    Performance:
    
    No significant perf change.
    Ran TPCH (scale 30) with mt_dop set to 4 on a 3 node minicluster on my
    desktop.
    
    +----------+-----------------------+---------+------------+------------+----------------+
    | Workload | File Format           | Avg (s) | Delta(Avg) | GeoMean(s) | Delta(GeoMean) |
    +----------+-----------------------+---------+------------+------------+----------------+
    | TPCH(30) | parquet / none / none | 5.85    | +0.35%     | 4.35       | +0.60%         |
    +----------+-----------------------+---------+------------+------------+----------------+
    
    +----------+----------+-----------------------+--------+-------------+------------+-----------+----------------+-------+----------------+---------+-------+
    | Workload | Query    | File Format           | Avg(s) | Base Avg(s) | Delta(Avg) | StdDev(%) | Base StdDev(%) | Iters | Median Diff(%) | MW Zval | Tval  |
    +----------+----------+-----------------------+--------+-------------+------------+-----------+----------------+-------+----------------+---------+-------+
    | TPCH(30) | TPCH-Q15 | parquet / none / none | 4.46   | 4.30        |   +3.63%   |   1.64%   |   2.23%        | 30    |   +3.56%       | 5.28    | 7.09  |
    | TPCH(30) | TPCH-Q13 | parquet / none / none | 9.51   | 9.23        |   +3.12%   |   1.19%   |   0.79%        | 30    |   +2.84%       | 6.48    | 11.69 |
    | TPCH(30) | TPCH-Q2  | parquet / none / none | 2.37   | 2.32        |   +1.89%   |   3.76%   |   3.26%        | 30    |   +2.16%       | 1.97    | 2.06  |
    | TPCH(30) | TPCH-Q9  | parquet / none / none | 16.84  | 16.55       |   +1.77%   |   1.24%   |   0.57%        | 30    |   +1.62%       | 5.75    | 6.99  |
    | TPCH(30) | TPCH-Q19 | parquet / none / none | 3.65   | 3.60        |   +1.43%   |   2.19%   |   1.72%        | 30    |   +1.39%       | 2.53    | 2.79  |
    | TPCH(30) | TPCH-Q18 | parquet / none / none | 12.64  | 12.48       |   +1.33%   |   1.51%   |   1.53%        | 30    |   +1.28%       | 2.67    | 3.36  |
    | TPCH(30) | TPCH-Q14 | parquet / none / none | 2.94   | 2.91        |   +1.03%   |   1.91%   |   1.77%        | 30    |   +1.48%       | 1.98    | 2.16  |
    | TPCH(30) | TPCH-Q6  | parquet / none / none | 1.31   | 1.29        |   +1.44%   |   5.48%   |   3.66%        | 30    |   +0.43%       | 1.10    | 1.18  |
    | TPCH(30) | TPCH-Q7  | parquet / none / none | 5.49   | 5.44        |   +0.78%   |   0.59%   |   1.16%        | 30    |   +0.95%       | 3.49    | 3.27  |
    | TPCH(30) | TPCH-Q8  | parquet / none / none | 5.40   | 5.35        |   +0.83%   |   0.95%   |   0.95%        | 30    |   +0.83%       | 2.81    | 3.37  |
    | TPCH(30) | TPCH-Q20 | parquet / none / none | 2.82   | 2.81        |   +0.50%   |   1.58%   |   1.72%        | 30    |   +0.30%       | 1.22    | 1.16  |
    | TPCH(30) | TPCH-Q11 | parquet / none / none | 1.43   | 1.42        |   +0.64%   |   2.42%   |   1.96%        | 30    |   +0.09%       | 0.68    | 1.13  |
    | TPCH(30) | TPCH-Q4  | parquet / none / none | 2.59   | 2.58        |   +0.28%   |   1.54%   |   1.99%        | 30    |   +0.23%       | 1.17    | 0.60  |
    | TPCH(30) | TPCH-Q22 | parquet / none / none | 2.17   | 2.17        |   +0.15%   |   3.04%   |   3.68%        | 30    |   +0.15%       | 0.30    | 0.18  |
    | TPCH(30) | TPCH-Q16 | parquet / none / none | 2.02   | 2.02        |   +0.21%   |   2.73%   |   2.85%        | 30    |   +0.07%       | 0.26    | 0.30  |
    | TPCH(30) | TPCH-Q5  | parquet / none / none | 4.16   | 4.15        |   +0.04%   |   4.49%   |   4.38%        | 30    |   -0.03%       | -0.13   | 0.03  |
    | TPCH(30) | TPCH-Q3  | parquet / none / none | 4.75   | 4.76        |   -0.28%   |   1.34%   |   1.47%        | 30    |   -0.10%       | -0.57   | -0.77 |
    | TPCH(30) | TPCH-Q10 | parquet / none / none | 4.92   | 4.95        |   -0.52%   |   1.45%   |   2.71%        | 30    |   +0.03%       | 0.12    | -0.94 |
    | TPCH(30) | TPCH-Q12 | parquet / none / none | 2.82   | 2.85        |   -0.78%   |   1.50%   |   1.68%        | 30    |   -0.65%       | -1.66   | -1.91 |
    | TPCH(30) | TPCH-Q1  | parquet / none / none | 4.49   | 4.53        |   -0.97%   |   2.26%   |   2.69%        | 30    |   -0.68%       | -0.99   | -1.52 |
    | TPCH(30) | TPCH-Q17 | parquet / none / none | 10.11  | 10.19       |   -0.85%   |   3.27%   |   3.71%        | 30    |   -0.84%       | -0.85   | -0.94 |
    | TPCH(30) | TPCH-Q21 | parquet / none / none | 21.75  | 22.28       |   -2.37%   |   3.51%   |   7.00%        | 30    |   -0.88%       | -1.35   | -1.66 |
    +----------+----------+-----------------------+--------+-------------+------------+-----------+----------------+-------+----------------+---------+-------+
    
    Change-Id: I9a101d0d98dff6e3779f85bc466e4c0bdb38094b
    Reviewed-on: http://gerrit.cloudera.org:8080/15926
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/base-sequence-scanner.cc        |  15 +-
 be/src/exec/hdfs-orc-scanner.h              |   2 +-
 be/src/exec/hdfs-scan-node-base.cc          | 456 ++++++++++++++++------------
 be/src/exec/hdfs-scan-node-base.h           | 286 ++++++++++++-----
 be/src/exec/hdfs-scan-node-mt.cc            |  38 ++-
 be/src/exec/hdfs-scan-node-mt.h             |  16 +-
 be/src/exec/hdfs-scan-node.cc               |  25 +-
 be/src/exec/hdfs-scan-node.h                |   5 +
 be/src/exec/hdfs-scanner.cc                 |   2 +-
 be/src/exec/hdfs-text-scanner.cc            |   2 +-
 be/src/exec/parquet/hdfs-parquet-scanner.cc |  20 +-
 be/src/exec/parquet/hdfs-parquet-scanner.h  |   5 +
 be/src/runtime/io/request-ranges.h          |   7 +
 be/src/runtime/io/scan-range.cc             |  15 +
 be/src/scheduling/scheduler-test.cc         |  40 +--
 be/src/scheduling/scheduler.cc              |  67 +---
 be/src/scheduling/scheduler.h               |  12 +-
 tests/query_test/test_mt_dop.py             |  45 +++
 tests/query_test/test_scanners.py           |  49 ++-
 19 files changed, 697 insertions(+), 410 deletions(-)

diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc
index 65fd7d0..064ef4c 100644
--- a/be/src/exec/base-sequence-scanner.cc
+++ b/be/src/exec/base-sequence-scanner.cc
@@ -54,9 +54,6 @@ Status BaseSequenceScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
   for (int i = 0; i < files.size(); ++i) {
     ScanRangeMetadata* metadata =
         static_cast<ScanRangeMetadata*>(files[i]->splits[0]->meta_data());
-    ScanRangeMetadata* header_metadata =
-        scan_node->runtime_state()->obj_pool()->Add(new ScanRangeMetadata(*metadata));
-    header_metadata->is_sequence_header = true;
     int64_t header_size = min<int64_t>(HEADER_SIZE, files[i]->file_length);
     // The header is almost always a remote read. Set the disk id to -1 and indicate
     // it is not cached.
@@ -66,8 +63,12 @@ Status BaseSequenceScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
     int cache_options = !scan_node->IsDataCacheDisabled() ? BufferOpts::USE_DATA_CACHE :
         BufferOpts::NO_CACHING;
     ScanRange* header_range = scan_node->AllocateScanRange(files[i]->fs,
-        files[i]->filename.c_str(), header_size, 0, header_metadata, -1, expected_local,
-        files[i]->is_erasure_coded, files[i]->mtime, BufferOpts(cache_options));
+        files[i]->filename.c_str(), header_size, 0, metadata->partition_id, -1,
+        expected_local, files[i]->is_erasure_coded, files[i]->mtime,
+        BufferOpts(cache_options), metadata->original_split);
+    ScanRangeMetadata* header_metadata =
+            static_cast<ScanRangeMetadata*>(header_range->meta_data());
+    header_metadata->is_sequence_header = true;
     header_ranges.push_back(header_range);
   }
   // When the header is parsed, we will issue more AddDiskIoRanges in
@@ -170,7 +171,7 @@ Status BaseSequenceScanner::GetNextInternal(RowBatch* row_batch) {
     // Header is parsed, set the metadata in the scan node and issue more ranges.
     static_cast<HdfsScanNodeBase*>(scan_node_)->SetFileMetadata(
         context_->partition_descriptor()->id(), stream_->filename(), header_);
-    HdfsFileDesc* desc = scan_node_->GetFileDesc(
+    const HdfsFileDesc* desc = scan_node_->GetFileDesc(
         context_->partition_descriptor()->id(), stream_->filename());
     // Issue the scan range with priority since it would result in producing a RowBatch.
     status = scan_node_->AddDiskIoRanges(desc, EnqueueLocation::HEAD);
@@ -329,7 +330,7 @@ Status BaseSequenceScanner::SkipToSync(const uint8_t* sync, int sync_size) {
 
 void BaseSequenceScanner::CloseFileRanges(const char* filename) {
   DCHECK(only_parsing_header_);
-  HdfsFileDesc* desc = scan_node_->GetFileDesc(
+  const HdfsFileDesc* desc = scan_node_->GetFileDesc(
       context_->partition_descriptor()->id(), filename);
   const vector<ScanRange*>& splits = desc->splits;
   for (int i = 0; i < splits.size(); ++i) {
diff --git a/be/src/exec/hdfs-orc-scanner.h b/be/src/exec/hdfs-orc-scanner.h
index a95a13f..688dcce 100644
--- a/be/src/exec/hdfs-orc-scanner.h
+++ b/be/src/exec/hdfs-orc-scanner.h
@@ -119,7 +119,7 @@ class HdfsOrcScanner : public HdfsColumnarScanner {
 
    private:
     HdfsOrcScanner* scanner_;
-    HdfsFileDesc* file_desc_;
+    const HdfsFileDesc* file_desc_;
     std::string filename_;
   };
 
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index bef92fd..b536fad 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -224,23 +224,137 @@ Status HdfsScanPlanNode::Init(const TPlanNode& tnode, FragmentState* state) {
         *min_max_row_desc, state, &min_max_conjuncts_));
   }
 
-  const vector<const PlanFragmentInstanceCtxPB*>& instance_ctxs =
+  RETURN_IF_ERROR(ProcessScanRangesAndInitSharedState(state));
+
+  state->CheckAndAddCodegenDisabledMessage(codegen_status_msgs_);
+  return Status::OK();
+}
+
+Status HdfsScanPlanNode::ProcessScanRangesAndInitSharedState(FragmentState* state) {
+  // Initialize the template tuple pool.
+  shared_state_.template_pool_.reset(new MemPool(state->query_mem_tracker()));
+  auto& template_tuple_map_ = shared_state_.partition_template_tuple_map_;
+  ObjectPool* obj_pool = shared_state_.obj_pool();
+  auto& file_descs = shared_state_.file_descs_;
+  HdfsFsCache::HdfsFsMap fs_cache;
+  int num_ranges_missing_volume_id = 0;
+  int64_t total_splits = 0;
+  const vector<const PlanFragmentInstanceCtxPB*>& instance_ctx_pbs =
       state->instance_ctx_pbs();
-  for (auto ctx : instance_ctxs) {
-    auto ranges = ctx->per_node_scan_ranges().find(tnode.node_id);
+  for (auto ctx : instance_ctx_pbs) {
+    auto ranges = ctx->per_node_scan_ranges().find(tnode_->node_id);
     if (ranges == ctx->per_node_scan_ranges().end()) continue;
-    for (const ScanRangeParamsPB& scan_range_param : ranges->second.scan_ranges()) {
-      DCHECK(scan_range_param.scan_range().has_hdfs_file_split());
-      const HdfsFileSplitPB& split = scan_range_param.scan_range().hdfs_file_split();
+    for (const ScanRangeParamsPB& params : ranges->second.scan_ranges()) {
+      DCHECK(params.scan_range().has_hdfs_file_split());
+      const HdfsFileSplitPB& split = params.scan_range().hdfs_file_split();
       HdfsPartitionDescriptor* partition_desc =
           hdfs_table_->GetPartition(split.partition_id());
-      scanned_file_formats_.insert(partition_desc->file_format());
+      if (template_tuple_map_.find(split.partition_id()) == template_tuple_map_.end()) {
+        template_tuple_map_[split.partition_id()] =
+            InitTemplateTuple(partition_desc->partition_key_value_evals(),
+                shared_state_.template_pool_.get());
+      }
+      // Convert the ScanRangeParamsPB into per-file DiskIO::ScanRange objects and
+      // populate partition_ids_, file_descs_, and per_type_files_.
+      if (partition_desc == nullptr) {
+        // TODO: this should be a DCHECK but we sometimes hit it. It's likely IMPALA-1702.
+        LOG(ERROR) << "Bad table descriptor! table_id=" << hdfs_table_->id()
+                   << " partition_id=" << split.partition_id() << "\n"
+                   << PrintThrift(state->fragment())
+                   << state->fragment_ctx().DebugString();
+        return Status("Query encountered invalid metadata, likely due to IMPALA-1702."
+                      " Try rerunning the query.");
+      }
+
+      filesystem::path file_path(partition_desc->location());
+      file_path.append(split.relative_path(), filesystem::path::codecvt());
+      const string& native_file_path = file_path.native();
+
+      auto file_desc_map_key = make_pair(partition_desc->id(), native_file_path);
+      HdfsFileDesc* file_desc = nullptr;
+      auto file_desc_it = file_descs.find(file_desc_map_key);
+      if (file_desc_it == file_descs.end()) {
+        // Add new file_desc to file_descs_ and per_type_files_
+        file_desc = obj_pool->Add(new HdfsFileDesc(native_file_path));
+        file_descs[file_desc_map_key] = file_desc;
+        file_desc->file_length = split.file_length();
+        file_desc->mtime = split.mtime();
+        file_desc->file_compression = CompressionTypePBToThrift(split.file_compression());
+        file_desc->file_format = partition_desc->file_format();
+        file_desc->is_erasure_coded = split.is_erasure_coded();
+        RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
+            native_file_path, &file_desc->fs, &fs_cache));
+        shared_state_.per_type_files_[partition_desc->file_format()].push_back(file_desc);
+      } else {
+        // File already processed
+        file_desc = file_desc_it->second;
+      }
+
+      bool expected_local = params.has_is_remote() && !params.is_remote();
+      if (expected_local && params.volume_id() == -1) ++num_ranges_missing_volume_id;
+
+      int cache_options = BufferOpts::NO_CACHING;
+      if (params.has_try_hdfs_cache() && params.try_hdfs_cache()) {
+        cache_options |= BufferOpts::USE_HDFS_CACHE;
+      }
+      if ((!expected_local || FLAGS_always_use_data_cache)
+          && !state->query_options().disable_data_cache) {
+        cache_options |= BufferOpts::USE_DATA_CACHE;
+      }
+      ScanRangeMetadata* metadata =
+          obj_pool->Add(new ScanRangeMetadata(split.partition_id(), nullptr));
+      file_desc->splits.push_back(ScanRange::AllocateScanRange(obj_pool, file_desc->fs,
+          file_desc->filename.c_str(), split.length(), split.offset(), {}, metadata,
+          params.volume_id(), expected_local, file_desc->is_erasure_coded,
+          file_desc->mtime, BufferOpts(cache_options)));
+      total_splits++;
     }
+    // Update server wide metrics for number of scan ranges and ranges that have
+    // incomplete metadata.
+    ImpaladMetrics::NUM_RANGES_PROCESSED->Increment(ranges->second.scan_ranges().size());
+    ImpaladMetrics::NUM_RANGES_MISSING_VOLUME_ID->Increment(num_ranges_missing_volume_id);
   }
-  state->CheckAndAddCodegenDisabledMessage(codegen_status_msgs_);
+  // Set up the rest of the shared state.
+  shared_state_.remaining_scan_range_submissions_.Store(instance_ctx_pbs.size());
+  shared_state_.progress().Init(
+      Substitute("Splits complete (node=$0)", tnode_->node_id), total_splits);
+  shared_state_.use_mt_scan_node_ = tnode_->hdfs_scan_node.use_mt_scan_node;
+
+  // Distribute the work evenly for issuing initial scan ranges.
+  DCHECK(shared_state_.use_mt_scan_node_ || instance_ctx_pbs.size() == 1)
+      << "Non MT scan node should only have a single instance.";
+  auto instance_ctxs = state->instance_ctxs();
+  DCHECK_EQ(instance_ctxs.size(), instance_ctx_pbs.size());
+  int files_per_instance = file_descs.size() / instance_ctxs.size();
+  int remainder = file_descs.size() % instance_ctxs.size();
+  int num_lists = min(file_descs.size(), instance_ctxs.size());
+  auto fd_it = file_descs.begin();
+  for (int i = 0; i < num_lists; ++i) {
+    vector<HdfsFileDesc*>* curr_file_list =
+        &shared_state_
+             .file_assignment_per_instance_[instance_ctxs[i]->fragment_instance_id];
+    for (int j = 0; j < files_per_instance + (i < remainder); ++j) {
+      curr_file_list->push_back(fd_it->second);
+      ++fd_it;
+    }
+  }
+  DCHECK(fd_it == file_descs.end());
   return Status::OK();
 }
 
+Tuple* HdfsScanPlanNode::InitTemplateTuple(
+    const std::vector<ScalarExprEvaluator*>& evals, MemPool* pool) const {
+  if (partition_key_slots_.empty()) return nullptr;
+  Tuple* template_tuple = Tuple::Create(tuple_desc_->byte_size(), pool);
+  for (int i = 0; i < partition_key_slots_.size(); ++i) {
+    const SlotDescriptor* slot_desc = partition_key_slots_[i];
+    ScalarExprEvaluator* eval = evals[slot_desc->col_pos()];
+    // Exprs guaranteed to be literals, so can safely be evaluated without a row.
+    RawValue::Write(eval->GetValue(NULL), template_tuple, slot_desc, nullptr);
+  }
+  return template_tuple;
+}
+
 int HdfsScanPlanNode::GetMaterializedSlotIdx(const std::vector<int>& path) const {
   auto result = path_to_materialized_slot_idx_.find(path);
   if (result == path_to_materialized_slot_idx_.end()) {
@@ -257,6 +371,9 @@ void HdfsScanPlanNode::Close() {
     ScalarExpr::Close(tid_conjunct.second);
   }
   ScalarExpr::Close(min_max_conjuncts_);
+  if (shared_state_.template_pool_.get() != nullptr) {
+    shared_state_.template_pool_->FreeAll();
+  }
   PlanNode::Close();
 }
 
@@ -299,7 +416,8 @@ HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const HdfsScanPlanNode& pno
     materialized_slots_(pnode.materialized_slots_),
     partition_key_slots_(pnode.partition_key_slots_),
     disks_accessed_bitmap_(TUnit::UNIT, 0),
-    active_hdfs_read_thread_counter_(TUnit::UNIT, 0) {}
+    active_hdfs_read_thread_counter_(TUnit::UNIT, 0),
+    shared_state_(const_cast<ScanRangeSharedState*>(&(pnode.shared_state_))) {}
 
 HdfsScanNodeBase::~HdfsScanNodeBase() {}
 
@@ -327,75 +445,6 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
         expr_perm_pool(), expr_results_pool(), &min_max_conjunct_evals_));
   }
 
-  // One-time initialization of state that is constant across scan ranges
-  scan_node_pool_.reset(new MemPool(mem_tracker()));
-
-  HdfsFsCache::HdfsFsMap fs_cache;
-  // Convert the ScanRangeParamsPB into per-file DiskIO::ScanRange objects and populate
-  // partition_ids_, file_descs_, and per_type_files_.
-  DCHECK(scan_range_params_ != NULL)
-      << "Must call SetScanRanges() before calling Prepare()";
-  int num_ranges_missing_volume_id = 0;
-  for (const ScanRangeParamsPB& params : *scan_range_params_) {
-    DCHECK(params.scan_range().has_hdfs_file_split());
-    const HdfsFileSplitPB& split = params.scan_range().hdfs_file_split();
-    partition_ids_.insert(split.partition_id());
-    HdfsPartitionDescriptor* partition_desc =
-        hdfs_table_->GetPartition(split.partition_id());
-    if (partition_desc == NULL) {
-      // TODO: this should be a DCHECK but we sometimes hit it. It's likely IMPALA-1702.
-      LOG(ERROR) << "Bad table descriptor! table_id=" << hdfs_table_->id()
-                 << " partition_id=" << split.partition_id() << "\n"
-                 << PrintThrift(state->instance_ctx()) << "\n"
-                 << state->instance_ctx_pb().DebugString();
-      return Status("Query encountered invalid metadata, likely due to IMPALA-1702."
-                    " Try rerunning the query.");
-    }
-
-    filesystem::path file_path(partition_desc->location());
-    file_path.append(split.relative_path(), filesystem::path::codecvt());
-    const string& native_file_path = file_path.native();
-
-    auto file_desc_map_key = make_pair(partition_desc->id(), native_file_path);
-    HdfsFileDesc* file_desc = NULL;
-    FileDescMap::iterator file_desc_it = file_descs_.find(file_desc_map_key);
-    if (file_desc_it == file_descs_.end()) {
-      // Add new file_desc to file_descs_ and per_type_files_
-      file_desc = runtime_state_->obj_pool()->Add(new HdfsFileDesc(native_file_path));
-      file_descs_[file_desc_map_key] = file_desc;
-      file_desc->file_length = split.file_length();
-      file_desc->mtime = split.mtime();
-      file_desc->file_compression = CompressionTypePBToThrift(split.file_compression());
-      file_desc->is_erasure_coded = split.is_erasure_coded();
-      RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
-          native_file_path, &file_desc->fs, &fs_cache));
-      per_type_files_[partition_desc->file_format()].push_back(file_desc);
-    } else {
-      // File already processed
-      file_desc = file_desc_it->second;
-    }
-
-    bool expected_local = params.has_is_remote() && !params.is_remote();
-    if (expected_local && params.volume_id() == -1) ++num_ranges_missing_volume_id;
-
-    int cache_options = BufferOpts::NO_CACHING;
-    if (params.has_try_hdfs_cache() && params.try_hdfs_cache()) {
-      cache_options |= BufferOpts::USE_HDFS_CACHE;
-    }
-    if ((!expected_local || FLAGS_always_use_data_cache) && !IsDataCacheDisabled()) {
-      cache_options |= BufferOpts::USE_DATA_CACHE;
-    }
-    file_desc->splits.push_back(
-        AllocateScanRange(file_desc->fs, file_desc->filename.c_str(), split.length(),
-            split.offset(), split.partition_id(), params.volume_id(), expected_local,
-            file_desc->is_erasure_coded, file_desc->mtime, BufferOpts(cache_options)));
-  }
-
-  // Update server wide metrics for number of scan ranges and ranges that have
-  // incomplete metadata.
-  ImpaladMetrics::NUM_RANGES_PROCESSED->Increment(scan_range_params_->size());
-  ImpaladMetrics::NUM_RANGES_MISSING_VOLUME_ID->Increment(num_ranges_missing_volume_id);
-
   // Check if reservation was enough to allocate at least one buffer. The
   // reservation calculation in HdfsScanNode.java should guarantee this.
   // Hitting this error indicates a misconfiguration or bug.
@@ -406,13 +455,19 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
       Substitute("HDFS scan min reservation $0 must be >= min buffer size $1",
        resource_profile_.min_reservation, min_buffer_size));
   }
-  // Add per volume stats to the runtime profile
-  PerVolumeStats per_volume_stats;
-  stringstream str;
-  UpdateHdfsSplitStats(*scan_range_params_, &per_volume_stats);
-  PrintHdfsSplitStats(per_volume_stats, &str);
+
+  // One-time initialization of state that is constant across scan ranges
+  scan_node_pool_.reset(new MemPool(mem_tracker()));
   runtime_profile()->AddInfoString("Table Name", hdfs_table_->fully_qualified_name());
-  runtime_profile()->AddInfoString(HDFS_SPLIT_STATS_DESC, str.str());
+
+  if (HasRowBatchQueue()) {
+    // Add per volume stats to the runtime profile for Non MT scan node.
+    PerVolumeStats per_volume_stats;
+    stringstream str;
+    UpdateHdfsSplitStats(*scan_range_params_, &per_volume_stats);
+    PrintHdfsSplitStats(per_volume_stats, &str);
+    runtime_profile()->AddInfoString(HDFS_SPLIT_STATS_DESC, str.str());
+  }
   return Status::OK();
 }
 
@@ -420,10 +475,10 @@ void HdfsScanPlanNode::Codegen(FragmentState* state) {
   DCHECK(state->ShouldCodegen());
   PlanNode::Codegen(state);
   if (IsNodeCodegenDisabled()) return;
-  for (THdfsFileFormat::type format: scanned_file_formats_) {
+  for (auto& elem: shared_state_.per_type_files_) {
     llvm::Function* fn;
     Status status;
-    switch (format) {
+    switch (elem.first) {
       case THdfsFileFormat::TEXT:
         status = HdfsTextScanner::Codegen(this, state, &fn);
         break;
@@ -443,12 +498,12 @@ void HdfsScanPlanNode::Codegen(FragmentState* state) {
         status = Status::Expected("Not implemented for this format.");
     }
     DCHECK(fn != NULL || !status.ok());
-    const char* format_name = _THdfsFileFormat_VALUES_TO_NAMES.find(format)->second;
+    const char* format_name = _THdfsFileFormat_VALUES_TO_NAMES.find(elem.first)->second;
     if (status.ok()) {
       LlvmCodeGen* codegen = state->codegen();
       DCHECK(codegen != NULL);
       codegen->AddFunctionToJit(
-          fn, &codegend_fn_map_[static_cast<THdfsFileFormat::type>(format)]);
+          fn, &codegend_fn_map_[static_cast<THdfsFileFormat::type>(elem.first)]);
     }
     AddCodegenStatus(status, format_name);
   }
@@ -467,16 +522,6 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
   // Open min max conjuncts
   RETURN_IF_ERROR(ScalarExprEvaluator::Open(min_max_conjunct_evals_, state));
 
-  // Create template tuples for all partitions.
-  for (int64_t partition_id: partition_ids_) {
-    HdfsPartitionDescriptor* partition_desc = hdfs_table_->GetPartition(partition_id);
-    DCHECK(partition_desc != NULL) << "table_id=" << hdfs_table_->id()
-                                   << " partition_id=" << partition_id
-                                   << "\n" << PrintThrift(state->instance_ctx());
-    partition_template_tuple_map_[partition_id] = InitTemplateTuple(
-        partition_desc->partition_key_value_evals(), scan_node_pool_.get(), state);
-  }
-
   RETURN_IF_ERROR(ClaimBufferReservation(state));
   reader_context_ = ExecEnv::GetInstance()->disk_io_mgr()->RegisterContext();
 
@@ -552,10 +597,6 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
       ExecEnv::GetInstance()->disk_io_mgr()->num_total_disks() + 1);
 
   counters_running_ = true;
-
-  int64_t total_splits = 0;
-  for (const auto& fd: file_descs_) total_splits += fd.second->splits.size();
-  progress_.Init(Substitute("Splits complete (node=$0)", id_), total_splits);
   return Status::OK();
 }
 
@@ -607,29 +648,30 @@ Status HdfsScanNodeBase::IssueInitialScanRanges(RuntimeState* state) {
 
   if (filter_ctxs_.size() > 0) WaitForRuntimeFilters();
   // Apply dynamic partition-pruning per-file.
-  FileFormatsMap matching_per_type_files;
-  for (const FileFormatsMap::value_type& v: per_type_files_) {
-    vector<HdfsFileDesc*>* matching_files = &matching_per_type_files[v.first];
-    for (HdfsFileDesc* file: v.second) {
-      if (FilePassesFilterPredicates(filter_ctxs_, v.first, file)) {
-        matching_files->push_back(file);
-      } else {
-        SkipFile(v.first, file);
-      }
+  HdfsFileDesc::FileFormatsMap matching_per_type_files;
+  std::vector<HdfsFileDesc*>* file_list =
+      shared_state_->GetFilesForIssuingScanRangesForInstance(
+          runtime_state_->instance_ctx().fragment_instance_id);
+  if (file_list == nullptr) return Status::OK();
+  for (HdfsFileDesc* file : *file_list) {
+    if (FilePassesFilterPredicates(filter_ctxs_, file->file_format, file)) {
+      matching_per_type_files[file->file_format].push_back(file);
+    } else {
+      SkipFile(file->file_format, file);
     }
-    // Randomize the order this node processes the files. We want to do this to avoid
-    // issuing remote reads to the same DN from different impalads. In file formats such
-    // as avro/seq/rc (i.e. splittable with a header), every node first reads the header.
-    // If every node goes through the files in the same order, all the remote reads are
-    // for the same file meaning a few DN serves a lot of remote reads at the same time.
-    random_shuffle(matching_files->begin(), matching_files->end());
   }
 
   // Issue initial ranges for all file types. Only call functions for file types that
   // actually exist - trying to add empty lists of ranges can result in spurious
   // CANCELLED errors - see IMPALA-6564.
-  for (const auto& entry : matching_per_type_files) {
+  for (auto& entry : matching_per_type_files) {
     if (entry.second.empty()) continue;
+    // Randomize the order this node processes the files. We want to do this to avoid
+    // issuing remote reads to the same DN from different impalads. In file formats such
+    // as avro/seq/rc (i.e. splittable with a header), every node first reads the header.
+    // If every node goes through the files in the same order, all the remote reads are
+    // for the same file meaning a few DN serves a lot of remote reads at the same time.
+    random_shuffle(entry.second.begin(), entry.second.end());
     switch (entry.first) {
       case THdfsFileFormat::PARQUET:
         RETURN_IF_ERROR(HdfsParquetScanner::IssueInitialRanges(this, entry.second));
@@ -680,7 +722,7 @@ void HdfsScanNodeBase::SkipScanRange(io::ScanRange* scan_range) {
   DCHECK(partition != nullptr) << "table_id=" << hdfs_table_->id()
                                << " partition_id=" << partition_id << "\n"
                                << PrintThrift(runtime_state_->instance_ctx());
-  HdfsFileDesc* desc = GetFileDesc(partition_id, *scan_range->file_string());
+  const HdfsFileDesc* desc = GetFileDesc(partition_id, *scan_range->file_string());
   if (metadata->is_sequence_header) {
     // File ranges haven't been issued yet, skip entire file.
     UpdateRemainingScanRangeSubmissions(-1);
@@ -698,7 +740,7 @@ Status HdfsScanNodeBase::StartNextScanRange(const std::vector<FilterContext>& fi
   bool needs_buffers;
   // Loop until we've got a scan range or run out of ranges.
   do {
-    RETURN_IF_ERROR(reader_context_->GetNextUnstartedRange(scan_range, &needs_buffers));
+    RETURN_IF_ERROR(GetNextScanRangeToRead(scan_range, &needs_buffers));
     if (*scan_range == nullptr) return Status::OK();
     if (filter_ctxs.size() > 0) {
       int64_t partition_id =
@@ -751,16 +793,8 @@ ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
     int64_t len, int64_t offset, int64_t partition_id, int disk_id, bool expected_local,
     bool is_erasure_coded, int64_t mtime,  const BufferOpts& buffer_opts,
     const ScanRange* original_split) {
-  ScanRangeMetadata* metadata = runtime_state_->obj_pool()->Add(
-        new ScanRangeMetadata(partition_id, original_split));
-  return AllocateScanRange(fs, file, len, offset, {}, metadata, disk_id, expected_local,
-      is_erasure_coded, mtime, buffer_opts);
-}
-
-ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
-    int64_t len, int64_t offset, ScanRangeMetadata* metadata, int disk_id,
-    bool expected_local, bool is_erasure_coded, int64_t mtime,
-    const BufferOpts& buffer_opts) {
+  ScanRangeMetadata* metadata =
+      shared_state_->obj_pool()->Add(new ScanRangeMetadata(partition_id, original_split));
   return AllocateScanRange(fs, file, len, offset, {}, metadata, disk_id, expected_local,
       is_erasure_coded, mtime, buffer_opts);
 }
@@ -769,32 +803,25 @@ ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
     int64_t len, int64_t offset, vector<ScanRange::SubRange>&& sub_ranges,
     int64_t partition_id, int disk_id, bool expected_local, bool is_erasure_coded,
     int64_t mtime, const BufferOpts& buffer_opts, const ScanRange* original_split) {
-  ScanRangeMetadata* metadata = runtime_state_->obj_pool()->Add(
-      new ScanRangeMetadata(partition_id, original_split));
+  ScanRangeMetadata* metadata =
+      shared_state_->obj_pool()->Add(new ScanRangeMetadata(partition_id, original_split));
   return AllocateScanRange(fs, file, len, offset, move(sub_ranges), metadata,
       disk_id, expected_local, is_erasure_coded, mtime, buffer_opts);
 }
 
-ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
-    int64_t len, int64_t offset, vector<ScanRange::SubRange>&& sub_ranges,
-    ScanRangeMetadata* metadata, int disk_id, bool expected_local, bool is_erasure_coded,
-    int64_t mtime, const BufferOpts& buffer_opts) {
-  DCHECK_GE(disk_id, -1);
+ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
+    int64_t offset, vector<ScanRange::SubRange>&& sub_ranges, ScanRangeMetadata* metadata,
+    int disk_id, bool expected_local, bool is_erasure_coded, int64_t mtime,
+    const BufferOpts& buffer_opts) {
   // Require that the scan range is within [0, file_length). While this cannot be used
   // to guarantee safety (file_length metadata may be stale), it avoids different
   // behavior between Hadoop FileSystems (e.g. s3n hdfsSeek() returns error when seeking
   // beyond the end of the file).
-  DCHECK_GE(offset, 0);
-  DCHECK_GE(len, 0);
   DCHECK_LE(offset + len, GetFileDesc(metadata->partition_id, file)->file_length)
       << "Scan range beyond end of file (offset=" << offset << ", len=" << len << ")";
-  disk_id = ExecEnv::GetInstance()->disk_io_mgr()->AssignQueue(
-      file, disk_id, expected_local);
-
-  ScanRange* range = runtime_state_->obj_pool()->Add(new ScanRange);
-  range->Reset(fs, file, len, offset, disk_id, expected_local, is_erasure_coded, mtime,
-      buffer_opts, move(sub_ranges), metadata);
-  return range;
+  return ScanRange::AllocateScanRange(shared_state_->obj_pool(), fs, file, len, offset,
+      move(sub_ranges), metadata, disk_id, expected_local, is_erasure_coded, mtime,
+      buffer_opts);
 }
 
 ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
@@ -805,37 +832,6 @@ ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
       is_erasure_coded, mtime, BufferOpts(cache_options), original_split);
 }
 
-Status HdfsScanNodeBase::AddDiskIoRanges(const vector<ScanRange*>& ranges,
-    EnqueueLocation enqueue_location) {
-  DCHECK(!progress_.done()) << "Don't call AddScanRanges() after all ranges finished.";
-  DCHECK_GT(remaining_scan_range_submissions_.Load(), 0);
-  DCHECK_GT(ranges.size(), 0);
-  return reader_context_->AddScanRanges(ranges, enqueue_location);
-}
-
-HdfsFileDesc* HdfsScanNodeBase::GetFileDesc(int64_t partition_id, const string& filename) {
-  auto file_desc_map_key = make_pair(partition_id, filename);
-  DCHECK(file_descs_.find(file_desc_map_key) != file_descs_.end());
-  return file_descs_[file_desc_map_key];
-}
-
-void HdfsScanNodeBase::SetFileMetadata(
-    int64_t partition_id, const string& filename, void* metadata) {
-  unique_lock<mutex> l(metadata_lock_);
-  auto file_metadata_map_key = make_pair(partition_id, filename);
-  DCHECK(per_file_metadata_.find(file_metadata_map_key) == per_file_metadata_.end());
-  per_file_metadata_[file_metadata_map_key] = metadata;
-}
-
-void* HdfsScanNodeBase::GetFileMetadata(
-    int64_t partition_id, const string& filename) {
-  unique_lock<mutex> l(metadata_lock_);
-  auto file_metadata_map_key = make_pair(partition_id, filename);
-  auto it = per_file_metadata_.find(file_metadata_map_key);
-  if (it == per_file_metadata_.end()) return NULL;
-  return it->second;
-}
-
 void* HdfsScanNodeBase::GetCodegenFn(THdfsFileFormat::type type) {
   auto it = codegend_fn_map_.find(type);
   if (it == codegend_fn_map_.end()) return NULL;
@@ -888,19 +884,6 @@ Status HdfsScanNodeBase::CreateAndOpenScannerHelper(HdfsPartitionDescriptor* par
   return ScanNodeDebugAction(TExecNodePhase::PREPARE_SCANNER);
 }
 
-Tuple* HdfsScanNodeBase::InitTemplateTuple(const vector<ScalarExprEvaluator*>& evals,
-    MemPool* pool, RuntimeState* state) const {
-  if (partition_key_slots_.empty()) return NULL;
-  Tuple* template_tuple = Tuple::Create(tuple_desc_->byte_size(), pool);
-  for (int i = 0; i < partition_key_slots_.size(); ++i) {
-    const SlotDescriptor* slot_desc = partition_key_slots_[i];
-    ScalarExprEvaluator* eval = evals[slot_desc->col_pos()];
-    // Exprs guaranteed to be literals, so can safely be evaluated without a row.
-    RawValue::Write(eval->GetValue(NULL), template_tuple, slot_desc, NULL);
-  }
-  return template_tuple;
-}
-
 void HdfsScanNodeBase::InitNullCollectionValues(const TupleDescriptor* tuple_desc,
     Tuple* tuple) const {
   for (const SlotDescriptor* slot_desc: tuple_desc->collection_slots()) {
@@ -939,7 +922,7 @@ bool HdfsScanNodeBase::PartitionPassesFilters(int32_t partition_id,
   if (FilterContext::CheckForAlwaysFalse(stats_name, filter_ctxs)) return false;
   DCHECK_EQ(filter_ctxs.size(), filter_ctxs_.size())
       << "Mismatched number of filter contexts";
-  Tuple* template_tuple = partition_template_tuple_map_[partition_id];
+  Tuple* template_tuple = GetTemplateTupleForPartitionId(partition_id);
   // Defensive - if template_tuple is NULL, there can be no filters on partition columns.
   if (template_tuple == nullptr) return true;
   TupleRow* tuple_row_mem = reinterpret_cast<TupleRow*>(&template_tuple);
@@ -968,7 +951,7 @@ void HdfsScanNodeBase::RangeComplete(const THdfsFileFormat::type& file_type,
 void HdfsScanNodeBase::RangeComplete(const THdfsFileFormat::type& file_type,
     const vector<THdfsCompression::type>& compression_types, bool skipped) {
   scan_ranges_complete_counter_->Add(1);
-  progress_.Update(1);
+  shared_state_->progress().Update(1);
   HdfsCompressionTypesSet compression_set;
   for (int i = 0; i < compression_types.size(); ++i) {
     compression_set.AddType(compression_types[i]);
@@ -977,7 +960,7 @@ void HdfsScanNodeBase::RangeComplete(const THdfsFileFormat::type& file_type,
 }
 
 void HdfsScanNodeBase::SkipFile(const THdfsFileFormat::type& file_type,
-    HdfsFileDesc* file) {
+    const HdfsFileDesc* file) {
   for (int i = 0; i < file->splits.size(); ++i) {
     RangeComplete(file_type, file->file_compression, true);
   }
@@ -1182,3 +1165,90 @@ void HdfsScanNodeBase::UpdateBytesRead(
     bytes_read_per_col_[slot_id].compressed_bytes_read.Add(compressed_bytes_read);
   }
 }
+
+HdfsFileDesc* ScanRangeSharedState::GetFileDesc(
+    int64_t partition_id, const std::string& filename) {
+  auto file_desc_map_key = make_pair(partition_id, filename);
+  DCHECK(file_descs_.find(file_desc_map_key) != file_descs_.end());
+  return file_descs_[file_desc_map_key];
+}
+
+void ScanRangeSharedState::SetFileMetadata(
+    int64_t partition_id, const string& filename, void* metadata) {
+  unique_lock<mutex> l(metadata_lock_);
+  auto file_metadata_map_key = make_pair(partition_id, filename);
+  DCHECK(per_file_metadata_.find(file_metadata_map_key) == per_file_metadata_.end());
+  per_file_metadata_[file_metadata_map_key] = metadata;
+}
+
+void* ScanRangeSharedState::GetFileMetadata(
+    int64_t partition_id, const string& filename) {
+  unique_lock<mutex> l(metadata_lock_);
+  auto file_metadata_map_key = make_pair(partition_id, filename);
+  auto it = per_file_metadata_.find(file_metadata_map_key);
+  if (it == per_file_metadata_.end()) return NULL;
+  return it->second;
+}
+
+Tuple* ScanRangeSharedState::GetTemplateTupleForPartitionId(int64_t partition_id) {
+  DCHECK(partition_template_tuple_map_.find(partition_id)
+      != partition_template_tuple_map_.end());
+  return partition_template_tuple_map_[partition_id];
+}
+
+void ScanRangeSharedState::UpdateRemainingScanRangeSubmissions(int32_t delta) {
+  int new_val = remaining_scan_range_submissions_.Add(delta);
+  DCHECK_GE(new_val, 0);
+  if (!use_mt_scan_node_) return;
+  if (new_val == 0) {
+    // Last thread has added its ranges. Acquire lock so that no waiting thread misses the
+    // last notify.
+    std::unique_lock<std::mutex> l(scan_range_submission_lock_);
+  }
+  range_submission_cv_.NotifyAll();
+}
+
+void ScanRangeSharedState::EnqueueScanRange(
+    const vector<ScanRange*>& ranges, bool at_front) {
+  DCHECK(use_mt_scan_node_) << "Should only be called by MT scan nodes";
+  if (!at_front) {
+    for (ScanRange* scan_range : ranges) {
+      if(scan_range->UseHdfsCache()){
+        scan_range_queue_.PushFront(scan_range);
+        continue;
+      }
+      scan_range_queue_.Enqueue(scan_range);
+    }
+  } else {
+    for (ScanRange* scan_range : ranges) {
+      scan_range_queue_.PushFront(scan_range);
+    }
+  }
+}
+
+Status ScanRangeSharedState::GetNextScanRange(
+    RuntimeState* state, ScanRange** scan_range) {
+  DCHECK(use_mt_scan_node_) << "Should only be called by MT scan nodes";
+  while (true) {
+    *scan_range = scan_range_queue_.Dequeue();
+    if (*scan_range != nullptr) return Status::OK();
+    {
+      unique_lock<mutex> l(scan_range_submission_lock_);
+      while (scan_range_queue_.empty() && remaining_scan_range_submissions_.Load() > 0
+          && !state->is_cancelled()) {
+        range_submission_cv_.Wait(l);
+      }
+    }
+    // No more work to do.
+    if (scan_range_queue_.empty() && remaining_scan_range_submissions_.Load() == 0) {
+      break;
+    }
+    if (state->is_cancelled()) return Status::CANCELLED;
+  }
+  return Status::OK();
+}
+
+void ScanRangeSharedState::AddCancellationHook(RuntimeState* state) {
+  DCHECK(use_mt_scan_node_) << "Should only be called by MT scan nodes";
+  state->AddCancellationCV(&scan_range_submission_lock_, &range_submission_cv_);
+}
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 3d88706..859c25d 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -38,25 +38,31 @@
 #include "util/container-util.h"
 #include "util/progress-updater.h"
 #include "util/spinlock.h"
+#include "util/unique-id-hash.h"
 
 namespace impala {
 
 class ScannerContext;
 class DescriptorTbl;
 class HdfsScanner;
+class HdfsScanPlanNode;
 class RowBatch;
 class Status;
 class Tuple;
 class TPlanNode;
 class TScanRange;
 
-/// Maintains per file information for files assigned to this scan node.  This includes
+/// Maintains per file information for files assigned to this scan node. This includes
 /// all the splits for the file. Note that it is not thread-safe.
 struct HdfsFileDesc {
   HdfsFileDesc(const std::string& filename)
-    : fs(NULL), filename(filename), file_length(0), mtime(0),
-      file_compression(THdfsCompression::NONE), is_erasure_coded(false) {
-  }
+    : fs(NULL),
+      filename(filename),
+      file_length(0),
+      mtime(0),
+      file_compression(THdfsCompression::NONE),
+      file_format(THdfsFileFormat::TEXT),
+      is_erasure_coded(false) {}
 
   /// Connection to the filesystem containing the file.
   hdfsFS fs;
@@ -72,12 +78,22 @@ struct HdfsFileDesc {
   int64_t mtime;
 
   THdfsCompression::type file_compression;
+  THdfsFileFormat::type file_format;
 
   /// is erasure coded
   bool is_erasure_coded;
 
   /// Splits (i.e. raw byte ranges) for this file, assigned to this scan node.
   std::vector<io::ScanRange*> splits;
+
+  /// Some useful typedefs for creating HdfsFileDesc related data structures.
+  /// This is a pair for partition ID and filename which uniquely identifies a file.
+  typedef pair<int64_t, std::string> PartitionFileKey;
+  /// Partition_id, File path => file descriptor
+  typedef std::unordered_map<PartitionFileKey, HdfsFileDesc*, pair_hash> FileDescMap;
+  /// File format => file descriptors.
+  typedef std::map<THdfsFileFormat::type, std::vector<HdfsFileDesc*>>
+    FileFormatsMap;
 };
 
 /// Struct for additional metadata for scan ranges. This contains the partition id
@@ -99,6 +115,147 @@ struct ScanRangeMetadata {
       : partition_id(partition_id), original_split(original_split) { }
 };
 
+/// Encapsulated all mutable state related to scan ranges that is shared across all scan
+/// node instances. Maintains the following context that can be accessed by all instances
+/// in a thread safe manner:
+/// - File Descriptors
+/// - File Metadata
+/// - Template Tuples for partitions
+/// - ProgressUpdater keeping tracks of how many scan ranges have been read.
+/// - Counter tracking remaining scan range submissions
+/// - API for fetching and adding a scan range to the queue.
+///
+/// An instance of this can be created and initialized exclusively by HdfsScanPlanNode.
+class ScanRangeSharedState {
+ public:
+  /// Given a partition_id and filename returns the related file descriptor DCHECK ensures
+  /// there is always file descriptor returned.
+  /// TODO: The LZO scanner expects a non const object so switch to returning a const once
+  /// support for LZO scanner is removed.
+  HdfsFileDesc* GetFileDesc(int64_t partition_id, const std::string& filename);
+
+  /// Sets the scanner specific metadata for 'partition_id' and 'filename'.
+  /// Scanners can use this to store file header information. Thread safe.
+  void SetFileMetadata(
+      int64_t partition_id, const std::string& filename, void* metadata);
+
+  /// Returns the scanner specific metadata for 'partition_id' and 'filename'.
+  /// Returns nullptr if there is no metadata. Thread safe.
+  void* GetFileMetadata(int64_t partition_id, const std::string& filename);
+
+  /// Get the template tuple which has only the partition columns materialized for the
+  /// partition identified by 'partition_id' .
+  Tuple* GetTemplateTupleForPartitionId(int64_t partition_id);
+
+  ObjectPool* obj_pool() { return &obj_pool_; }
+  ProgressUpdater& progress() { return progress_; }
+  HdfsFileDesc::FileFormatsMap& per_type_files() { return per_type_files_; }
+
+  /// Updates the counter keeping track of remaining scan range submissions. If MT scan
+  /// nodes are being used, it notifies all instances waiting in GetNextScanRange() every
+  /// time it updates.
+  void UpdateRemainingScanRangeSubmissions(int32_t delta);
+
+  /// Returns the number of remaining scan range submissions.
+  int RemainingScanRangeSubmissions() {
+    return remaining_scan_range_submissions_.Load();
+  }
+
+  /// Returns the list of files assigned to the fragment with id 'fragment_instance_id'
+  /// for which initial scan ranges need to be issued. Returns nullptr if the instance
+  /// has not been assigned anything.
+  std::vector<HdfsFileDesc*>* GetFilesForIssuingScanRangesForInstance(
+      const TUniqueId fragment_instance_id) {
+    auto it = file_assignment_per_instance_.find(fragment_instance_id);
+    if (it == file_assignment_per_instance_.end()) return nullptr;
+    return &it->second;
+  }
+
+  /// The following public methods are only used by MT scan nodes.
+
+  /// Adds all scan ranges to the queue. If 'at_front' is true or the range has
+  /// USE_HDFS_CACHE option set, then adds it to the front of the queue.
+  void EnqueueScanRange(const std::vector<io::ScanRange*>& ranges, bool at_front);
+
+  /// Sets a reference to the next scan range in input variable 'scan_range' from a queue
+  /// of scan ranges that need to be read. Blocks if there are remaining scan range
+  /// submissions and the queue is empty. Unblocks and returns CANCELLED status in case
+  /// the query was cancelled. 'scan_range' is set to nullptr if no more scan ranges are
+  /// left to read.
+  Status GetNextScanRange(RuntimeState* state, io::ScanRange** scan_range);
+
+  /// Add the required hooks to the runtime state that gets triggered in case of
+  /// cancellation. Must be called before adding or removing scan ranges to the queue.
+  void AddCancellationHook(RuntimeState* state);
+
+ private:
+  friend class HdfsScanPlanNode;
+
+  ScanRangeSharedState() = default;
+  DISALLOW_COPY_AND_ASSIGN(ScanRangeSharedState);
+
+  /// Contains all the file descriptors and the scan ranges created.
+  ObjectPool obj_pool_;
+
+  /// For storing partition template tuples.
+  std::unique_ptr<MemPool> template_pool_;
+
+  /// partition_id, File path => file descriptor (which includes the file's splits).
+  /// Populated in HdfsScanPlanNode::Init() after which it is never modified.
+  HdfsFileDesc::FileDescMap file_descs_;
+
+  /// Scanner specific per file metadata (e.g. header information) and associated lock.
+  /// Currently only used by sequence scanners.
+  /// Key of the map is partition_id, filename pair
+  std::mutex metadata_lock_;
+  std::unordered_map<HdfsFileDesc::PartitionFileKey, void*, pair_hash> per_file_metadata_;
+
+  /// Map from partition ID to a template tuple (owned by template_pool_) which has only
+  /// the partition columns for that partition materialized. Used to filter files and scan
+  /// ranges on partition-column filters. Populated in HdfsScanPlanNode::Init().
+  boost::unordered_map<int64_t, Tuple*> partition_template_tuple_map_;
+
+  /// File format => file descriptors.
+  HdfsFileDesc::FileFormatsMap per_type_files_;
+
+  /// Contains a list of files for every instance equally divided among all. Indexed by
+  /// fragment instance id. Might not contain an entry for all instances in case there are
+  /// not enough files to be read.
+  std::unordered_map<TUniqueId, std::vector<HdfsFileDesc*>> file_assignment_per_instance_;
+
+  /// Keeps track of total splits remaining to be read.
+  ProgressUpdater progress_;
+
+  /// When this counter drops to 0, AddDiskIoRanges() should not be called again, and
+  /// therefore scanner threads (for non-MT) or fragment instances (for MT) can't get work
+  /// should exit. For most file formats (except for sequence-based formats), this is 0
+  /// after IssueInitialRanges() has been called by all fragment instances. Note that some
+  /// scanners (namely Parquet) issue additional work to the IO subsystem without using
+  /// AddDiskIoRanges(), but that is managed within the scanner, and doesn't require
+  /// additional scanner threads.
+  /// Initialized the number of instances for this fragment in HdfsScanPlanNode::Init().
+  AtomicInt32 remaining_scan_range_submissions_{0};
+
+  /// Indicates whether MT scan nodes are being used.
+  bool use_mt_scan_node_ = false;
+
+  /// The following are only used by MT scan nodes.
+  /////////////////////////////////////////////////////////////////////
+  /// BEGIN: Members that are used only by MT scan nodes(use_mt_scan_node_ is true).
+
+  /// Lock synchronizes access for calling wait on the condition variable.
+  std::mutex scan_range_submission_lock_;
+  /// Used by scan instances to wait for remaining scan range submission
+  ConditionVariable range_submission_cv_;
+
+  /// Queue of all scan ranges that need to be read. Shared by all instances of this
+  /// fragment. Only used for MT scans.
+  InternalQueue<io::ScanRange> scan_range_queue_;
+
+  /// END: Members that are used only by MT scan nodes(use_mt_scan_node_ is true).
+  /////////////////////////////////////////////////////////////////////
+};
+
 class HdfsScanPlanNode : public ScanPlanNode {
  public:
   virtual Status Init(const TPlanNode& tnode, FragmentState* state) override;
@@ -157,11 +314,21 @@ class HdfsScanPlanNode : public ScanPlanNode {
   /// The root of the table's Avro schema, if we're scanning an Avro table.
   ScopedAvroSchemaElement avro_schema_;
 
-  /// File formats that instances of this node will read.
-  boost::unordered_set<THdfsFileFormat::type> scanned_file_formats_;
-
   /// Per scanner type codegen'd fn.
   boost::unordered_map<THdfsFileFormat::type, void*> codegend_fn_map_;
+
+  /// State related to scan ranges shared across all scan node instances.
+  ScanRangeSharedState shared_state_;
+
+  /// Allocates and initializes a new template tuple allocated from pool with values
+  /// from the partition columns for the current scan range, if any,
+  /// Returns nullptr if there are no partition keys slots.
+  Tuple* InitTemplateTuple(
+      const std::vector<ScalarExprEvaluator*>& evals, MemPool* pool) const;
+
+  /// Processes all the scan range params for this scan node to create all the required
+  /// file descriptors, update metrics and fill in all relevant state in 'shared_state_'.
+  Status ProcessScanRangesAndInitSharedState(FragmentState* state);
 };
 
 /// Base class for all Hdfs scan nodes. Contains common members and functions
@@ -302,7 +469,7 @@ class HdfsScanNodeBase : public ScanNode {
   inline void IncNumScannersCodegenEnabled() { num_scanners_codegen_enabled_.Add(1); }
   inline void IncNumScannersCodegenDisabled() { num_scanners_codegen_disabled_.Add(1); }
 
-  /// Allocate a new scan range object, stored in the runtime state's object pool. For
+  /// Allocate a new scan range object, stored in the fragment state's object pool. For
   /// scan ranges that correspond to the original hdfs splits, the partition id must be
   /// set to the range's partition id. Partition_id is mandatory as it is used to gather
   /// file descriptor info. expected_local should be true if this scan range is not
@@ -317,12 +484,6 @@ class HdfsScanNodeBase : public ScanNode {
       const io::BufferOpts& buffer_opts,
       const io::ScanRange* original_split = nullptr);
 
-  /// Same as above, but it takes a pointer to a ScanRangeMetadata object which contains
-  /// the partition_id, original_splits, and other information about the scan range.
-  io::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
-      int64_t offset, ScanRangeMetadata* metadata, int disk_id, bool expected_local,
-      bool is_erasure_coded, int64_t mtime, const io::BufferOpts& buffer_opts);
-
   /// Same as the first overload, but it takes sub-ranges as well.
   io::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
       int64_t offset, std::vector<io::ScanRange::SubRange>&& sub_ranges,
@@ -342,15 +503,14 @@ class HdfsScanNodeBase : public ScanNode {
       bool expected_local, int64_t mtime, bool is_erasure_coded = false,
       const io::ScanRange* original_split = nullptr);
 
-  /// Adds ranges to the io mgr queue. Can be overridden to add scan-node specific
-  /// actions like starting scanner threads. Must not be called once
-  /// remaining_scan_range_submissions_ is 0.
-  /// The enqueue_location specifies whether the scan ranges are added to the head or
-  /// tail of the queue.
+  /// Adds ranges to be read later by scanners. Must not be called once
+  /// remaining_scan_range_submissions_ is 0. The enqueue_location specifies whether the
+  /// scan ranges are added to the head or tail of the queue. Implemented by child classes
+  /// to add ranges to their implementation of a queue that is used to organize ranges.
   virtual Status AddDiskIoRanges(const std::vector<io::ScanRange*>& ranges,
-      EnqueueLocation enqueue_location = EnqueueLocation::TAIL) WARN_UNUSED_RESULT;
+      EnqueueLocation enqueue_location = EnqueueLocation::TAIL) = 0;
 
-  /// Adds all splits for file_desc to the io mgr queue.
+  /// Adds all splits for file_desc to be read later by scanners.
   inline Status AddDiskIoRanges(const HdfsFileDesc* file_desc,
       EnqueueLocation enqueue_location = EnqueueLocation::TAIL) WARN_UNUSED_RESULT {
     return AddDiskIoRanges(file_desc->splits);
@@ -360,27 +520,28 @@ class HdfsScanNodeBase : public ScanNode {
   /// Furthermore, this implies that scanner threads failing to
   /// acquire a new scan range with StartNextScanRange() can exit.
   inline void UpdateRemainingScanRangeSubmissions(int32_t delta) {
-    remaining_scan_range_submissions_.Add(delta);
-    DCHECK_GE(remaining_scan_range_submissions_.Load(), 0);
+    shared_state_->UpdateRemainingScanRangeSubmissions(delta);
   }
 
-  /// Allocates and initializes a new template tuple allocated from pool with values
-  /// from the partition columns for the current scan range, if any,
-  /// Returns NULL if there are no partition keys slots.
-  Tuple* InitTemplateTuple(const std::vector<ScalarExprEvaluator*>& value_evals,
-      MemPool* pool, RuntimeState* state) const;
-
   /// Given a partition_id and filename returns the related file descriptor
   /// DCHECK ensures there is always file descriptor returned
-  HdfsFileDesc* GetFileDesc(int64_t partition_id, const std::string& filename);
+  inline HdfsFileDesc* GetFileDesc(
+      int64_t partition_id, const std::string& filename) {
+    return shared_state_->GetFileDesc(partition_id, filename);
+  }
 
   /// Sets the scanner specific metadata for 'partition_id' and 'filename'.
   /// Scanners can use this to store file header information. Thread safe.
-  void SetFileMetadata(int64_t partition_id, const std::string& filename, void* metadata);
+  inline void SetFileMetadata(
+      int64_t partition_id, const std::string& filename, void* metadata) {
+    shared_state_->SetFileMetadata(partition_id, filename, metadata);
+  }
 
   /// Returns the scanner specific metadata for 'partition_id' and 'filename'.
   /// Returns nullptr if there is no metadata. Thread safe.
-  void* GetFileMetadata(int64_t partition_id, const std::string& filename);
+  inline void* GetFileMetadata(int64_t partition_id, const std::string& filename) {
+    return shared_state_->GetFileMetadata(partition_id, filename);
+  }
 
   /// Called by scanners when a range is complete. Used to record progress.
   /// This *must* only be called after a scanner has completely finished its
@@ -399,7 +560,7 @@ class HdfsScanNodeBase : public ScanNode {
       const std::vector<THdfsCompression::type>& compression_type, bool skipped = false);
 
   /// Calls RangeComplete() with skipped=true for all the splits of the file
-  void SkipFile(const THdfsFileFormat::type& file_type, HdfsFileDesc* file);
+  void SkipFile(const THdfsFileFormat::type& file_type, const HdfsFileDesc* file);
 
   /// Returns true if there are no materialized slots, such as a count(*) over the table.
   inline bool IsZeroSlotTableScan() const {
@@ -437,6 +598,7 @@ class HdfsScanNodeBase : public ScanNode {
   /// Update book-keeping to skip the scan range if it has been issued but will not be
   /// processed by a scanner. E.g. used to cancel ranges that are filtered out by
   /// late-arriving filters that could not be applied in IssueInitialScanRanges()
+  /// The ScanRange must have been added to a RequestContext.
   void SkipScanRange(io::ScanRange* scan_range);
 
   /// Helper to increase reservation from 'curr_reservation' up to 'ideal_reservation'
@@ -454,6 +616,12 @@ class HdfsScanNodeBase : public ScanNode {
   void UpdateBytesRead(
       SlotId slot_id, int64_t uncompressed_bytes_read, int64_t compressed_bytes_read);
 
+  /// Get the template tuple which has only the partition columns materialized for the
+  /// partition identified by 'partition_id'.
+  inline Tuple* GetTemplateTupleForPartitionId(int64_t partition_id) {
+    return shared_state_->GetTemplateTupleForPartitionId(partition_id);
+  }
+
  protected:
   friend class ScannerContext;
   friend class HdfsScanner;
@@ -492,11 +660,6 @@ class HdfsScanNodeBase : public ScanNode {
   /// Descriptor for tuples this scan node constructs
   const TupleDescriptor* tuple_desc_ = nullptr;
 
-  /// Map from partition ID to a template tuple (owned by scan_node_pool_) which has only
-  /// the partition columns for that partition materialized. Used to filter files and scan
-  /// ranges on partition-column filters. Populated in Open().
-  boost::unordered_map<int64_t, Tuple*> partition_template_tuple_map_;
-
   /// Descriptor for the hdfs table, including partition and format metadata.
   /// Set in Prepare, owned by RuntimeState
   const HdfsTableDescriptor* hdfs_table_ = nullptr;
@@ -504,28 +667,6 @@ class HdfsScanNodeBase : public ScanNode {
   /// The root of the table's Avro schema, if we're scanning an Avro table.
   const AvroSchemaElement& avro_schema_;
 
-  /// Partitions scanned by this scan node. Initialized in Prepare() and not modified
-  /// afterwards.
-  std::unordered_set<int64_t> partition_ids_;
-
-  /// This is a pair for partition ID and filename
-  typedef pair<int64_t, std::string> PartitionFileKey;
-
-  /// partition_id, File path => file descriptor (which includes the file's splits)
-  typedef std::unordered_map<PartitionFileKey, HdfsFileDesc*, pair_hash> FileDescMap;
-  FileDescMap file_descs_;
-
-  /// File format => file descriptors.
-  typedef std::map<THdfsFileFormat::type, std::vector<HdfsFileDesc*>>
-    FileFormatsMap;
-  FileFormatsMap per_type_files_;
-
-  /// Scanner specific per file metadata (e.g. header information) and associated lock.
-  /// Key of the map is partition_id, filename pair
-  /// TODO: Remove this lock when removing the legacy scanners and scan nodes.
-  std::mutex metadata_lock_;
-  std::unordered_map<PartitionFileKey, void*, pair_hash> per_file_metadata_;
-
   /// Conjuncts for each materialized tuple (top-level row batch tuples and collection
   /// item tuples). Includes a copy of ExecNode.conjuncts_.
   typedef std::unordered_map<TupleId, std::vector<ScalarExpr*>> ConjunctsMap;
@@ -537,19 +678,10 @@ class HdfsScanNodeBase : public ScanNode {
   const TDictFilterConjunctsMap* thrift_dict_filter_conjuncts_map_;
 
   /// Set to true when the initial scan ranges are issued to the IoMgr. This happens on
-  /// the first call to GetNext(). The token manager, in a different thread, will read
-  /// this variable.
+  /// the first call to GetNext() for non-MT scan nodes and in Open() for MT scan nodes.
+  /// Only used by the thread token manager in non-MT scan nodes in a separate thread.
   AtomicBool initial_ranges_issued_;
 
-  /// When this counter drops to 0, AddDiskIoRanges() will not be called again, and
-  /// therefore scanner threads that can't get work should exit. For most
-  /// file formats (except for sequence-based formats), this is 0 after
-  /// IssueInitialRanges(). Note that some scanners (namely Parquet) issue
-  /// additional work to the IO subsystem without using AddDiskIoRanges(),
-  /// but that is managed within the scanner, and doesn't require
-  /// additional scanner threads.
-  AtomicInt32 remaining_scan_range_submissions_ = { 1 };
-
   /// Per scanner type codegen'd fn.
   const boost::unordered_map<THdfsFileFormat::type, void*>& codegend_fn_map_;
 
@@ -567,9 +699,6 @@ class HdfsScanNodeBase : public ScanNode {
   /// Vector containing slot descriptors for all partition key slots.
   const std::vector<SlotDescriptor*>& partition_key_slots_;
 
-  /// Keeps track of total splits and the number finished.
-  ProgressUpdater progress_;
-
   /// Counters which track the number of scanners that have codegen enabled for the
   /// materialize and conjuncts evaluation code paths.
   AtomicInt32 num_scanners_codegen_enabled_;
@@ -642,6 +771,9 @@ class HdfsScanNodeBase : public ScanNode {
   /// can update the map concurrently
   boost::shared_mutex bytes_read_per_col_lock_;
 
+  /// Pointer to the scan range related state that is shared across all node instances.
+  ScanRangeSharedState* shared_state_ = nullptr;
+
   /// Performs dynamic partition pruning, i.e., applies runtime filters to files, and
   /// issues initial ranges for all file types. Waits for runtime filters if necessary.
   /// Only valid to call if !initial_ranges_issued_. Sets initial_ranges_issued_ to true.
@@ -704,6 +836,11 @@ class HdfsScanNodeBase : public ScanNode {
   /// debug action specified for the query.
   Status ScanNodeDebugAction(TExecNodePhase::type phase) WARN_UNUSED_RESULT;
 
+  /// Fetches the next range in queue to read. Implemented by child classes to fetch
+  /// ranges from their implementation of a queue that is used to organize ranges.
+  virtual Status GetNextScanRangeToRead(
+      io::ScanRange** scan_range, bool* needs_buffers) = 0;
+
  private:
   class HdfsCompressionTypesSet {
    public:
@@ -753,7 +890,6 @@ class HdfsScanNodeBase : public ScanNode {
       FileTypeCountsMap;
   FileTypeCountsMap file_type_counts_;
 };
-
 }
 
 #endif
diff --git a/be/src/exec/hdfs-scan-node-mt.cc b/be/src/exec/hdfs-scan-node-mt.cc
index c200392..85aada3 100644
--- a/be/src/exec/hdfs-scan-node-mt.cc
+++ b/be/src/exec/hdfs-scan-node-mt.cc
@@ -19,9 +19,11 @@
 
 #include <sstream>
 
+#include "exec/exec-node-util.h"
 #include "exec/scanner-context.h"
 #include "runtime/runtime-state.h"
 #include "runtime/row-batch.h"
+#include "runtime/exec-env.h"
 #include "util/debug-util.h"
 #include "util/runtime-profile-counters.h"
 
@@ -33,12 +35,11 @@ using namespace impala::io;
 
 namespace impala {
 
-HdfsScanNodeMt::HdfsScanNodeMt(ObjectPool* pool, const HdfsScanPlanNode& pnode,
-                           const DescriptorTbl& descs)
-    : HdfsScanNodeBase(pool, pnode, pnode.tnode_->hdfs_scan_node, descs),
-      scan_range_(NULL),
-      scanner_(NULL) {
-}
+HdfsScanNodeMt::HdfsScanNodeMt(
+    ObjectPool* pool, const HdfsScanPlanNode& pnode, const DescriptorTbl& descs)
+  : HdfsScanNodeBase(pool, pnode, pnode.tnode_->hdfs_scan_node, descs),
+    scan_range_(NULL),
+    scanner_(NULL) {}
 
 HdfsScanNodeMt::~HdfsScanNodeMt() {
 }
@@ -50,14 +51,17 @@ Status HdfsScanNodeMt::Prepare(RuntimeState* state) {
 
 Status HdfsScanNodeMt::Open(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
+  ScopedOpenEventAdder ea(this);
   RETURN_IF_ERROR(HdfsScanNodeBase::Open(state));
   DCHECK(!initial_ranges_issued_.Load());
+  shared_state_->AddCancellationHook(state);
   RETURN_IF_ERROR(IssueInitialScanRanges(state));
   return Status::OK();
 }
 
 Status HdfsScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
+  ScopedGetNextEventAdder ea(this, eos);
   RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
   RETURN_IF_CANCELLED(state);
   RETURN_IF_ERROR(QueryMaintenance(state));
@@ -140,4 +144,26 @@ void HdfsScanNodeMt::Close(RuntimeState* state) {
   HdfsScanNodeBase::Close(state);
 }
 
+Status HdfsScanNodeMt::AddDiskIoRanges(
+    const vector<ScanRange*>& ranges, EnqueueLocation enqueue_location) {
+  DCHECK(!shared_state_->progress().done())
+      << "Don't call AddScanRanges() after all ranges finished.";
+  DCHECK_GT(shared_state_->RemainingScanRangeSubmissions(), 0);
+  DCHECK_GT(ranges.size(), 0);
+  bool at_front = false;
+  if (enqueue_location == EnqueueLocation::HEAD) {
+    at_front = true;
+  }
+  shared_state_->EnqueueScanRange(ranges, at_front);
+  return Status::OK();
+}
+
+Status HdfsScanNodeMt::GetNextScanRangeToRead(
+    io::ScanRange** scan_range, bool* needs_buffers) {
+  RETURN_IF_ERROR(shared_state_->GetNextScanRange(runtime_state_, scan_range));
+  if (*scan_range != nullptr) {
+    RETURN_IF_ERROR(reader_context_->StartScanRange(*scan_range, needs_buffers));
+  }
+  return Status::OK();
+}
 }
diff --git a/be/src/exec/hdfs-scan-node-mt.h b/be/src/exec/hdfs-scan-node-mt.h
index 98ff6c2..8797ecd 100644
--- a/be/src/exec/hdfs-scan-node-mt.h
+++ b/be/src/exec/hdfs-scan-node-mt.h
@@ -41,8 +41,8 @@ class HdfsScanNodeMt : public HdfsScanNodeBase {
       ObjectPool* pool, const HdfsScanPlanNode& pnode, const DescriptorTbl& descs);
   ~HdfsScanNodeMt();
 
-  virtual Status Prepare(RuntimeState* state) override WARN_UNUSED_RESULT;
-  virtual Status Open(RuntimeState* state) override WARN_UNUSED_RESULT;
+  virtual Status Prepare(RuntimeState* state) override;
+  virtual Status Open(RuntimeState* state) override;
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos)
       override WARN_UNUSED_RESULT;
   virtual void Close(RuntimeState* state) override;
@@ -50,12 +50,20 @@ class HdfsScanNodeMt : public HdfsScanNodeBase {
   virtual bool HasRowBatchQueue() const override { return false; }
   virtual ExecutionModel getExecutionModel() const override { return TASK_BASED; }
 
+  /// Adds the range to a queue shared among all instances of this scan node.
+  Status AddDiskIoRanges(const std::vector<io::ScanRange*>& ranges,
+        EnqueueLocation enqueue_location = EnqueueLocation::TAIL) override;
+
+ protected:
+  /// Fetches the next range to read from a queue shared among all instances of this scan
+  /// node. Also schedules it to be read by disk threads via the reader context.
+  Status GetNextScanRangeToRead(io::ScanRange** scan_range, bool* needs_buffers) override;
+
  private:
   /// Create and open new scanner for this partition type.
   /// If the scanner is successfully created and opened, it is returned in 'scanner'.
   Status CreateAndOpenScanner(HdfsPartitionDescriptor* partition,
-      ScannerContext* context, boost::scoped_ptr<HdfsScanner>* scanner)
-      WARN_UNUSED_RESULT;
+      ScannerContext* context, boost::scoped_ptr<HdfsScanner>* scanner);
 
   /// Current scan range and corresponding scanner.
   io::ScanRange* scan_range_;
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 8601599..d9d7985 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -105,7 +105,7 @@ Status HdfsScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos
     // Release the scanner threads
     discard_result(ranges_issued_barrier_.Notify());
 
-    if (progress_.done()) SetDone();
+    if (shared_state_->progress().done()) SetDone();
   }
 
   Status status = GetNextInternal(state, row_batch, eos);
@@ -184,8 +184,8 @@ void HdfsScanNode::Close(RuntimeState* state) {
   // remaining_scan_range_submissions_ should be 0, if the
   // query started and wasn't cancelled or exited early.
   if (ranges_issued_barrier_.pending() == 0 && initial_ranges_issued_.Load()
-      && progress_.done()) {
-    DCHECK_EQ(remaining_scan_range_submissions_.Load(), 0);
+      && shared_state_->progress().done()) {
+    DCHECK_EQ(shared_state_->RemainingScanRangeSubmissions(), 0);
   }
 #endif
   HdfsScanNodeBase::Close(state);
@@ -209,7 +209,7 @@ void HdfsScanNode::AddMaterializedRowBatch(unique_ptr<RowBatch> row_batch) {
 
 Status HdfsScanNode::AddDiskIoRanges(const vector<ScanRange*>& ranges,
     EnqueueLocation enqueue_location) {
-  DCHECK_GT(remaining_scan_range_submissions_.Load(), 0);
+  DCHECK_GT(shared_state_->RemainingScanRangeSubmissions(), 0);
   RETURN_IF_ERROR(reader_context_->AddScanRanges(ranges, enqueue_location));
   if (!ranges.empty()) ThreadTokenAvailableCb(runtime_state_->resource_pool());
   return Status::OK();
@@ -226,8 +226,8 @@ int64_t HdfsScanNode::EstimateScannerThreadMemConsumption() const {
   // Note: this is crude and we could try to refine it by factoring in the number of
   // columns, etc, but it is unclear how beneficial this would be.
   int64_t est_non_reserved_bytes = FLAGS_hdfs_scanner_thread_max_estimated_bytes;
-  auto it = per_type_files_.find(THdfsFileFormat::TEXT);
-  if (it != per_type_files_.end()) {
+  auto it = shared_state_->per_type_files().find(THdfsFileFormat::TEXT);
+  if (it != shared_state_->per_type_files().end()) {
     for (HdfsFileDesc* file : it->second) {
       if (file->file_compression != THdfsCompression::NONE) {
         int64_t compressed_text_est_bytes =
@@ -301,7 +301,7 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourcePool* pool) {
     const int64_t scanner_thread_reservation = resource_profile_.min_reservation;
     // Cases 1, 2, 3.
     if (done() || all_ranges_started_ ||
-        num_active_scanner_threads >= progress_.remaining()) {
+        num_active_scanner_threads >= shared_state_->progress().remaining()) {
       break;
     }
 
@@ -399,7 +399,7 @@ void HdfsScanNode::ScannerThread(bool first_thread, int64_t scanner_thread_reser
     // Take a snapshot of remaining_scan_range_submissions before calling
     // StartNextScanRange().  We don't want it to go to zero between the return from
     // StartNextScanRange() and the check for when all ranges are complete.
-    int remaining_scan_range_submissions = remaining_scan_range_submissions_.Load();
+    int remaining_scan_range_submissions = shared_state_->RemainingScanRangeSubmissions();
     ScanRange* scan_range;
     Status status =
         StartNextScanRange(filter_ctxs, &scanner_thread_reservation, &scan_range);
@@ -420,7 +420,7 @@ void HdfsScanNode::ScannerThread(bool first_thread, int64_t scanner_thread_reser
     }
 
     // Done with range and it completed successfully
-    if (progress_.done()) {
+    if (shared_state_->progress().done()) {
       // All ranges are finished.  Indicate we are done.
       SetDone();
       break;
@@ -522,7 +522,7 @@ void HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
     SetError(status);
   }
   // Transfer remaining resources to a final batch and add it to the row batch queue and
-  // decrement progress_ to indicate that the scan range is complete.
+  // decrement progress() to indicate that the scan range is complete.
   scanner->Close();
   // Reservation may have been increased by the scanner, e.g. Parquet may allocate
   // additional reservation to scan columns.
@@ -551,3 +551,8 @@ void HdfsScanNode::SetError(const Status& status) {
   unique_lock<timed_mutex> l(lock_);
   SetDoneInternal(status);
 }
+
+Status HdfsScanNode::GetNextScanRangeToRead(
+    io::ScanRange** scan_range, bool* needs_buffers) {
+  return reader_context_->GetNextUnstartedRange(scan_range, needs_buffers);
+}
diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h
index 8d331e3..def5d8b 100644
--- a/be/src/exec/hdfs-scan-node.h
+++ b/be/src/exec/hdfs-scan-node.h
@@ -111,6 +111,11 @@ class HdfsScanNode : public HdfsScanNodeBase {
     return NON_TASK_BASED_SYNC;
   }
 
+ protected:
+  /// Fetches the next range to be read from the reader context. As a side effect, the
+  /// reader context also schedules it to be read by disk threads.
+  Status GetNextScanRangeToRead(io::ScanRange** scan_range, bool* needs_buffers) override;
+
  private:
   ScannerThreadState thread_state_;
 
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index c9796e6..f8f9a4d 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -112,7 +112,7 @@ Status HdfsScanner::Open(ScannerContext* context) {
   // Initialize the template_tuple_, it is copied from the template tuple map in the
   // HdfsScanNodeBase.
   Tuple* template_tuple =
-      scan_node_->partition_template_tuple_map_[context_->partition_descriptor()->id()];
+      scan_node_->GetTemplateTupleForPartitionId(context_->partition_descriptor()->id());
   if (template_tuple != nullptr) {
     template_tuple_ =
         template_tuple->DeepCopy(*scan_node_->tuple_desc(), template_tuple_pool_.get());
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index 9ae980e..a18627b 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -644,7 +644,7 @@ Status HdfsTextScanner::FillByteBufferCompressedFile(bool* eosr) {
   // For other compressed text: attempt to read and decompress the entire file, point
   // to the decompressed buffer, and then continue normal processing.
   DCHECK(decompression_type_ != THdfsCompression::SNAPPY);
-  HdfsFileDesc* desc = scan_node_->GetFileDesc(
+  const HdfsFileDesc* desc = scan_node_->GetFileDesc(
       context_->partition_descriptor()->id(), stream_->filename());
   int64_t file_size = desc->file_length;
   DCHECK_GT(file_size, 0);
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index c16bda8..7738410 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -70,12 +70,6 @@ static const string ACTUAL_RESERVATION_COUNTER_NAME = "ParquetRowGroupActualRese
 Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
     const vector<HdfsFileDesc*>& files) {
   DCHECK(!files.empty());
-  // Add Parquet-specific counters.
-  ADD_SUMMARY_STATS_COUNTER(
-      scan_node->runtime_profile(), IDEAL_RESERVATION_COUNTER_NAME, TUnit::BYTES);
-  ADD_SUMMARY_STATS_COUNTER(
-      scan_node->runtime_profile(), ACTUAL_RESERVATION_COUNTER_NAME, TUnit::BYTES);
-
   for (HdfsFileDesc* file : files) {
     // If the file size is less than 12 bytes, it is an invalid Parquet file.
     if (file->file_length < 12) {
@@ -138,6 +132,10 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
       scan_node_->runtime_profile(), "ParquetUncompressedPageSize", TUnit::BYTES);
   process_page_index_stats_ =
       ADD_SUMMARY_STATS_TIMER(scan_node_->runtime_profile(), "PageIndexProcessingTime");
+  row_group_ideal_reservation_counter_ = ADD_SUMMARY_STATS_COUNTER(
+      scan_node_->runtime_profile(), IDEAL_RESERVATION_COUNTER_NAME, TUnit::BYTES);
+  row_group_actual_reservation_counter_ = ADD_SUMMARY_STATS_COUNTER(
+      scan_node_->runtime_profile(), ACTUAL_RESERVATION_COUNTER_NAME, TUnit::BYTES);
 
   codegend_process_scratch_batch_fn_ = reinterpret_cast<ProcessScratchBatchFn>(
       scan_node_->GetCodegenFn(THdfsFileFormat::PARQUET));
@@ -565,8 +563,8 @@ Status HdfsParquetScanner::NextRowGroup() {
   int64_t split_offset = split_range->offset();
   int64_t split_length = split_range->len();
 
-  HdfsFileDesc* file_desc = scan_node_->GetFileDesc(
-      context_->partition_descriptor()->id(), filename());
+  const HdfsFileDesc* file_desc =
+      scan_node_->GetFileDesc(context_->partition_descriptor()->id(), filename());
 
   bool start_with_first_row_group = row_group_idx_ == -1;
   bool misaligned_row_group_skipped = false;
@@ -1652,10 +1650,8 @@ Status HdfsParquetScanner::DivideReservationBetweenColumns(
   if (ideal_reservation > context_->total_reservation()) {
     context_->TryIncreaseReservation(ideal_reservation);
   }
-  scan_node_->runtime_profile()->GetSummaryStatsCounter(ACTUAL_RESERVATION_COUNTER_NAME)->
-      UpdateCounter(context_->total_reservation());
-  scan_node_->runtime_profile()->GetSummaryStatsCounter(IDEAL_RESERVATION_COUNTER_NAME)->
-      UpdateCounter(ideal_reservation);
+  row_group_actual_reservation_counter_->UpdateCounter(context_->total_reservation());
+  row_group_ideal_reservation_counter_->UpdateCounter(ideal_reservation);
 
   vector<pair<int, int64_t>> tmp_reservations = DivideReservationBetweenColumnsHelper(
       min_buffer_size, max_buffer_size, col_range_lengths, context_->total_reservation());
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.h b/be/src/exec/parquet/hdfs-parquet-scanner.h
index 674441c..61b3354 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.h
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.h
@@ -488,6 +488,11 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
   /// to this counter
   RuntimeProfile::SummaryStatsCounter* parquet_uncompressed_page_size_counter_;
 
+  /// Average and min/max memory reservation for a scanning a row group, both
+  /// ideal(calculated based on min and max buffer size) and actual.
+  RuntimeProfile::SummaryStatsCounter* row_group_ideal_reservation_counter_;
+  RuntimeProfile::SummaryStatsCounter* row_group_actual_reservation_counter_;
+
   /// Number of collection items read in current row batch. It is a scanner-local counter
   /// used to reduce the frequency of updating HdfsScanNode counter. It is updated by the
   /// callees of AssembleRows() and is merged into the HdfsScanNode counter at the end of
diff --git a/be/src/runtime/io/request-ranges.h b/be/src/runtime/io/request-ranges.h
index 4445ad9..1c2a38a 100644
--- a/be/src/runtime/io/request-ranges.h
+++ b/be/src/runtime/io/request-ranges.h
@@ -248,6 +248,13 @@ class ScanRange : public RequestRange {
     int64_t length;
   };
 
+  /// Allocate a scan range object stored in the given 'obj_pool' and calls Reset() on it
+  /// with the rest of the input variables.
+  static ScanRange* AllocateScanRange(ObjectPool* obj_pool, hdfsFS fs, const char* file,
+      int64_t len, int64_t offset, std::vector<SubRange>&& sub_ranges, void* metadata,
+      int disk_id, bool expected_local, bool is_erasure_coded, int64_t mtime,
+      const BufferOpts& buffer_opts);
+
   /// Resets this scan range object with the scan range description. The scan range
   /// is for bytes [offset, offset + len) in 'file' on 'fs' (which is nullptr for the
   /// local filesystem). The scan range must be non-empty and fall within the file bounds
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index d9d566d..f1b0bde 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -449,6 +449,21 @@ void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
       buffer_opts, {}, meta_data);
 }
 
+ScanRange* ScanRange::AllocateScanRange(ObjectPool* obj_pool, hdfsFS fs, const char* file,
+    int64_t len, int64_t offset, std::vector<SubRange>&& sub_ranges, void* metadata,
+    int disk_id, bool expected_local, bool is_erasure_coded, int64_t mtime,
+    const BufferOpts& buffer_opts) {
+  DCHECK_GE(disk_id, -1);
+  DCHECK_GE(offset, 0);
+  DCHECK_GE(len, 0);
+  disk_id =
+      ExecEnv::GetInstance()->disk_io_mgr()->AssignQueue(file, disk_id, expected_local);
+  ScanRange* range = obj_pool->Add(new ScanRange);
+  range->Reset(fs, file, len, offset, disk_id, expected_local, is_erasure_coded, mtime,
+      buffer_opts, move(sub_ranges), metadata);
+  return range;
+}
+
 void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
     int disk_id, bool expected_local, bool is_erasure_coded, int64_t mtime,
     const BufferOpts& buffer_opts, vector<SubRange>&& sub_ranges, void* meta_data) {
diff --git a/be/src/scheduling/scheduler-test.cc b/be/src/scheduling/scheduler-test.cc
index 45d652e..3a132a7 100644
--- a/be/src/scheduling/scheduler-test.cc
+++ b/be/src/scheduling/scheduler-test.cc
@@ -739,11 +739,11 @@ TEST_F(SchedulerTest, TestMultipleFinstances) {
 
   // Test handling of the single instance case - all ranges go to the same instance.
   vector<vector<ScanRangeParamsPB>> fs_one_instance =
-      Scheduler::AssignRangesToInstances(1, &fs_ranges);
+      Scheduler::AssignRangesToInstances(1, fs_ranges);
   ASSERT_EQ(1, fs_one_instance.size());
   EXPECT_EQ(NUM_RANGES, fs_one_instance[0].size());
   vector<vector<ScanRangeParamsPB>> kudu_one_instance =
-      Scheduler::AssignRangesToInstances(1, &kudu_ranges);
+      Scheduler::AssignRangesToInstances(1, kudu_ranges);
   ASSERT_EQ(1, kudu_one_instance.size());
   EXPECT_EQ(NUM_RANGES, kudu_one_instance[0].size());
 
@@ -751,17 +751,11 @@ TEST_F(SchedulerTest, TestMultipleFinstances) {
   for (int attempt = 0; attempt < 20; ++attempt) {
     std::shuffle(fs_ranges.begin(), fs_ranges.end(), rng_);
     vector<vector<ScanRangeParamsPB>> range_per_instance =
-        Scheduler::AssignRangesToInstances(NUM_RANGES, &fs_ranges);
+        Scheduler::AssignRangesToInstances(NUM_RANGES, fs_ranges);
     EXPECT_EQ(NUM_RANGES, range_per_instance.size());
     // Confirm each range is present and each instance got exactly one range.
-    vector<int> range_length_count(NUM_RANGES);
-    for (const auto& instance_ranges : range_per_instance) {
-      ASSERT_EQ(1, instance_ranges.size());
-      ++range_length_count[instance_ranges[0].scan_range().hdfs_file_split().length()
-          - 1];
-    }
     for (int i = 0; i < NUM_RANGES; ++i) {
-      EXPECT_EQ(1, range_length_count[i]) << i;
+      EXPECT_EQ(1, range_per_instance[i].size()) << i;
     }
   }
 
@@ -770,33 +764,17 @@ TEST_F(SchedulerTest, TestMultipleFinstances) {
   for (int attempt = 0; attempt < 20; ++attempt) {
     std::shuffle(fs_ranges.begin(), fs_ranges.end(), rng_);
     vector<vector<ScanRangeParamsPB>> range_per_instance =
-        Scheduler::AssignRangesToInstances(4, &fs_ranges);
+        Scheduler::AssignRangesToInstances(4, fs_ranges);
     EXPECT_EQ(4, range_per_instance.size());
-    // Ensure we got a range of each length in the output.
-    vector<int> range_length_count(NUM_RANGES);
-    for (const auto& instance_ranges : range_per_instance) {
-      EXPECT_EQ(4, instance_ranges.size());
-      int64_t instance_bytes = 0;
-      for (const auto& range : instance_ranges) {
-        instance_bytes += range.scan_range().hdfs_file_split().length();
-        ++range_length_count[range.scan_range().hdfs_file_split().length() - 1];
-      }
-      // Expect each instance to get sum([1, 2, ..., 16]) / 4 bytes when things are
-      // distributed evenly.
-      EXPECT_EQ(34, instance_bytes);
-    }
-    for (int i = 0; i < NUM_RANGES; ++i) {
-      EXPECT_EQ(1, range_length_count[i]) << i;
+    for (int i = 0; i < range_per_instance.size(); ++i) {
+      EXPECT_EQ(4, range_per_instance[i].size()) << i;
     }
   }
-
-  // Test load balancing Kudu ranges across 4 instances. We should get an even assignment
-  // across the instances regardless of input order. We don't know the size of each Kudu
-  // range, so we just need to check the # of ranges.
+  // Now test the same for kudu ranges.
   for (int attempt = 0; attempt < 20; ++attempt) {
     std::shuffle(kudu_ranges.begin(), kudu_ranges.end(), rng_);
     vector<vector<ScanRangeParamsPB>> range_per_instance =
-        Scheduler::AssignRangesToInstances(4, &kudu_ranges);
+        Scheduler::AssignRangesToInstances(4, kudu_ranges);
     EXPECT_EQ(4, range_per_instance.size());
     for (const auto& instance_ranges : range_per_instance) {
       EXPECT_EQ(4, instance_ranges.size());
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index dc8f1f9..e11f273 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -314,34 +314,6 @@ void Scheduler::ComputeFragmentExecParams(const ExecutorConfig& executor_config,
   }
 }
 
-/// Returns a numeric weight that is proportional to the estimated processing time for
-/// the scan range represented by 'params'. Weights from different scan node
-/// implementations, e.g. FS vs Kudu, are not comparable.
-static int64_t ScanRangeWeight(const ScanRangeParamsPB& params) {
-  if (params.scan_range().has_hdfs_file_split()) {
-    return params.scan_range().hdfs_file_split().length();
-  } else {
-    // Give equal weight to each Kudu and Hbase split.
-    // TODO: implement more accurate logic for Kudu and Hbase
-    return 1;
-  }
-}
-
-/// Helper class used in CreateScanInstances() to track the amount of work assigned
-/// to each instance so far.
-struct InstanceAssignment {
-  // The weight assigned so far.
-  int64_t weight;
-
-  // The index of the instance in 'per_instance_ranges'
-  int instance_idx;
-
-  // Comparator for use in a heap as part of the longest processing time algo.
-  // Invert the comparison order because the *_heap functions implement a max-heap
-  // and we want to assign to the least-loaded instance first.
-  bool operator<(InstanceAssignment& other) const { return weight > other.weight; }
-};
-
 // Maybe the easiest way to understand the objective of this algorithm is as a
 // generalization of two simpler instance creation algorithms that decide how many
 // instances of a fragment to create on each node, given a set of nodes that were
@@ -439,12 +411,8 @@ void Scheduler::CreateCollocatedAndScanInstances(const ExecutorConfig& executor_
       if (assignment_it == sra.end()) continue;
       auto scan_ranges_it = assignment_it->second.find(scan_node_id);
       if (scan_ranges_it == assignment_it->second.end()) continue;
-
-      // We reorder the scan ranges vector in-place to avoid creating another copy of it.
-      // This should be safe since the code is single-threaded and other code does not
-      // depend on the order of the vector.
       per_scan_per_instance_ranges.back() =
-          AssignRangesToInstances(max_num_instances, &scan_ranges_it->second);
+          AssignRangesToInstances(max_num_instances, scan_ranges_it->second);
       DCHECK_LE(per_scan_per_instance_ranges.back().size(), max_num_instances);
     }
 
@@ -482,37 +450,18 @@ void Scheduler::CreateCollocatedAndScanInstances(const ExecutorConfig& executor_
 }
 
 vector<vector<ScanRangeParamsPB>> Scheduler::AssignRangesToInstances(
-    int max_num_instances, vector<ScanRangeParamsPB>* ranges) {
-  // We need to assign scan ranges to instances. We would like the assignment to be
-  // as even as possible, so that each instance does about the same amount of work.
-  // Use longest-processing time (LPT) algorithm, which is a good approximation of the
-  // optimal solution (there is a theoretic bound of ~4/3 of the optimal solution
-  // in the worst case). It also guarantees that at least one scan range is assigned
-  // to each instance.
+    int max_num_instances, vector<ScanRangeParamsPB>& ranges) {
   DCHECK_GT(max_num_instances, 0);
-  int num_instances = min(max_num_instances, static_cast<int>(ranges->size()));
+  int num_instances = min(max_num_instances, static_cast<int>(ranges.size()));
   vector<vector<ScanRangeParamsPB>> per_instance_ranges(num_instances);
   if (num_instances < 2) {
     // Short-circuit the assignment algorithm for the single instance case.
-    per_instance_ranges[0] = *ranges;
+    per_instance_ranges[0] = ranges;
   } else {
-    // The LPT algorithm is straightforward:
-    // 1. sort the scan ranges to be assigned by descending weight.
-    // 2. assign each item to the instance with the least weight assigned so far.
-    vector<InstanceAssignment> instance_heap;
-    instance_heap.reserve(num_instances);
-    for (int i = 0; i < num_instances; ++i) {
-      instance_heap.emplace_back(InstanceAssignment{0, i});
-    }
-    std::sort(ranges->begin(), ranges->end(),
-        [](const ScanRangeParamsPB& a, const ScanRangeParamsPB& b) {
-          return ScanRangeWeight(a) > ScanRangeWeight(b);
-        });
-    for (ScanRangeParamsPB& range : *ranges) {
-      per_instance_ranges[instance_heap[0].instance_idx].push_back(range);
-      instance_heap[0].weight += ScanRangeWeight(range);
-      pop_heap(instance_heap.begin(), instance_heap.end());
-      push_heap(instance_heap.begin(), instance_heap.end());
+    int idx = 0;
+    for (auto& range : ranges) {
+      per_instance_ranges[idx].push_back(range);
+      idx = (idx + 1 == num_instances) ? 0 : idx + 1;
     }
   }
   return per_instance_ranges;
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index 5167955..b55dfb0 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -382,14 +382,12 @@ class Scheduler {
   void CreateCollocatedAndScanInstances(const ExecutorConfig& executor_config,
       FragmentExecParams* fragment_params, QuerySchedule* schedule);
 
-  /// Compute an assignment of scan ranges 'ranges' that were assigned to a host to
-  /// at most 'max_num_instances' fragment instances running on the same host.
-  /// Attempts to minimize skew across the instances. 'max_num_ranges' must be
-  /// positive. Only returns non-empty vectors: if there are not enough ranges
-  /// to create 'max_num_instances', fewer instances are assigned ranges.
-  /// May reorder ranges in 'ranges'.
+  /// Does a round robin assignment of scan ranges 'ranges' that were assigned to a host
+  /// to at most 'max_num_instances' fragment instances running on the same host.
+  /// 'max_num_ranges' must be positive. Only returns non-empty vectors: if there are not
+  /// enough ranges to create 'max_num_instances', fewer instances are assigned ranges.
   static std::vector<std::vector<ScanRangeParamsPB>> AssignRangesToInstances(
-      int max_num_instances, std::vector<ScanRangeParamsPB>* ranges);
+      int max_num_instances, std::vector<ScanRangeParamsPB>& ranges);
 
   /// For each instance of fragment_params's input fragment, create a collocated
   /// instance for fragment_params's fragment.
diff --git a/tests/query_test/test_mt_dop.py b/tests/query_test/test_mt_dop.py
index 37cbda1..382fc8a 100644
--- a/tests/query_test/test_mt_dop.py
+++ b/tests/query_test/test_mt_dop.py
@@ -18,6 +18,7 @@
 # Tests queries with the MT_DOP query option.
 
 import pytest
+import logging
 
 from copy import deepcopy
 from tests.common.environ import ImpalaTestClusterProperties, build_flavor_timeout
@@ -28,6 +29,8 @@ from tests.common.skip import SkipIfABFS, SkipIfEC, SkipIfNotHdfsMinicluster
 from tests.common.test_vector import ImpalaTestDimension
 from tests.util.filesystem_utils import IS_HDFS
 
+LOG = logging.getLogger('test_mt_dop')
+
 WAIT_TIME_MS = build_flavor_timeout(60000, slow_build_timeout=100000)
 
 # COMPUTE STATS on Parquet tables automatically sets MT_DOP=4, so include
@@ -93,6 +96,48 @@ class TestMtDop(ImpalaTestSuite):
       vector.get_value('exec_option'))
     assert expected_results in results.data
 
+
+class TestMtDopScanNode(ImpalaTestSuite):
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestMtDopScanNode, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_constraint(
+      lambda v: v.get_value('table_format').file_format == 'text' and v.get_value(
+        'table_format').compression_codec == 'none')
+
+  def test_mt_dop_scan_node(self, vector, unique_database):
+    """Regression test to make sure scan ranges are shared among all scan node instances
+    when using mt_dop. This runs a selective hash join that will dynamically prune
+    partitions leaving less than 5% of the data. Before IMPALA-9655 this would almost
+    always result in a failure where at least one instance would have all its statically
+    assigned scan ranges pruned."""
+    fq_table_name = "%s.store_sales_subset" % unique_database
+    self.execute_query("create table %s as select distinct(ss_sold_date_sk) as "
+                       "sold_date from tpcds.store_sales limit 50" % fq_table_name)
+    vector.get_value('exec_option')['mt_dop'] = 10
+    vector.get_value('exec_option')['runtime_filter_wait_time_ms'] = 100000
+
+    # Since this depends on instances fetching scan ranges from a shared queue, running
+    # it multiple times ensures any flakiness is removed.
+    NUM_TRIES = 10
+    failed_count = 0
+    for i in xrange(NUM_TRIES):
+      try:
+        result = self.execute_query(
+          "select count(ss_sold_date_sk) from tpcds.store_sales, %s where "
+          "ss_sold_date_sk = sold_date" % fq_table_name,
+          vector.get_value('exec_option'))
+        assert "- BytesRead: 0" not in result.runtime_profile, result.runtime_profile
+        break
+      except Exception:
+        failed_count += 1
+        if i == NUM_TRIES - 1: raise
+    LOG.info("Num of times failed before success {0}".format(failed_count))
+
 class TestMtDopParquet(ImpalaTestSuite):
   @classmethod
   def get_workload(cls):
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index 6697a71..95cd782 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -70,6 +70,8 @@ DEBUG_ACTION_DIMS = [None,
 # Trigger injected soft limit failures when scanner threads check memory limit.
 DEBUG_ACTION_DIMS.append('HDFS_SCANNER_THREAD_CHECK_SOFT_MEM_LIMIT:FAIL@0.5')
 
+MT_DOP_VALUES = [0, 1, 4]
+
 class TestScannersAllTableFormats(ImpalaTestSuite):
   BATCH_SIZES = [0, 1, 16]
 
@@ -130,6 +132,7 @@ class TestScannersAllTableFormatsWithLimit(ImpalaTestSuite):
   @classmethod
   def add_test_dimensions(cls):
     super(TestScannersAllTableFormatsWithLimit, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('mt_dop', *MT_DOP_VALUES))
 
   def test_limit(self, vector):
     vector.get_value('exec_option')['abort_on_error'] = 1
@@ -171,6 +174,7 @@ class TestScannersMixedTableFormats(ImpalaTestSuite):
         ImpalaTestDimension('batch_size', *TestScannersAllTableFormats.BATCH_SIZES))
     cls.ImpalaTestMatrix.add_dimension(
         ImpalaTestDimension('debug_action', *DEBUG_ACTION_DIMS))
+    cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('mt_dop', *MT_DOP_VALUES))
 
   def test_mixed_format(self, vector):
     new_vector = deepcopy(vector)
@@ -609,6 +613,45 @@ class TestParquet(ImpalaTestSuite):
   @SkipIfIsilon.hdfs_block_size
   @SkipIfLocal.multiple_impalad
   @SkipIfEC.fix_later
+  def test_multiple_blocks_mt_dop(self, vector):
+    """Sanity check for MT scan nodes to make sure all blocks from the same file are read.
+    2 scan ranges per node should be created to read 'lineitem_sixblocks' because
+    there are 6 blocks and 3 scan nodes. We set mt_dop to 2, so ideally every instance
+    should read a single range, but since they share a queue its not deterministic and
+    instead we verify sum of ranges read on a backend is 2."""
+    query = 'select count(l_orderkey) from functional_parquet.lineitem_sixblocks'
+    try:
+      self.client.set_configuration_option('mt_dop', '2')
+      result = self.client.execute(query)
+      TOTAL_ROWS = 40000
+      ranges_complete_list = re.findall(r'ScanRangesComplete: ([0-9]*)',
+        result.runtime_profile)
+      num_rows_read_list = re.findall(r'RowsRead: [0-9.K]* \(([0-9]*)\)',
+        result.runtime_profile)
+      # The extra fragment is the "Averaged Fragment"
+      assert len(num_rows_read_list) == 7
+      assert len(ranges_complete_list) == 7
+
+      total_rows_read = 0
+      # Skip the Averaged Fragment; it comes first in the runtime profile.
+      for num_row_read in num_rows_read_list[1:]:
+        total_rows_read += int(num_row_read)
+      assert total_rows_read == TOTAL_ROWS
+
+      # Again skip the Averaged Fragment; it comes first in the runtime profile.
+      # With mt_dop 2, every backend will have 2 instances which are printed consecutively
+      # in the profile.
+      for i in range(1, len(ranges_complete_list), 2):
+        assert int(ranges_complete_list[i]) + int(ranges_complete_list[i + 1]) == 2
+    finally:
+      self.client.clear_configuration()
+
+  @SkipIfS3.hdfs_block_size
+  @SkipIfABFS.hdfs_block_size
+  @SkipIfADLS.hdfs_block_size
+  @SkipIfIsilon.hdfs_block_size
+  @SkipIfLocal.multiple_impalad
+  @SkipIfEC.fix_later
   def test_multiple_blocks(self, vector):
     # For IMPALA-1881. The table functional_parquet.lineitem_multiblock has 3 blocks, so
     # each impalad should read 1 scan range.
@@ -662,9 +705,9 @@ class TestParquet(ImpalaTestSuite):
 
     # This will fail if the number of impalads != 3
     # The fourth fragment is the "Averaged Fragment"
-    assert len(num_row_groups_list) == 4
-    assert len(scan_ranges_complete_list) == 4
-    assert len(num_rows_read_list) == 4
+    assert len(num_row_groups_list) == 4, result.runtime_profile
+    assert len(scan_ranges_complete_list) == 4, result.runtime_profile
+    assert len(num_rows_read_list) == 4, result.runtime_profile
 
     total_num_row_groups = 0
     # Skip the Averaged Fragment; it comes first in the runtime profile.