You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ab...@apache.org on 2016/09/08 04:39:37 UTC

[7/7] incubator-impala git commit: IMPALA-3905: Add single-threaded scan node.

IMPALA-3905: Add single-threaded scan node.

Adds a new single-threaded scan node HdfsScanNodeMt that
materializes tuples in the thread calling GetNext().
The new scan node uses the HdfsScanner::GetNext() interface,
which is currently only implemented for Parquet.
As before, I/O is performed asynchronously via the I/O manager.

The new scan node is enabled if the mt_dop query option is
set to a value greater than 1. Otherwise, the existing
multi-threaded scan node is used.

The changes are mostly a refactoring of the existing multi-threaded
scan node to separate out the common code between the existing
multi-threaded scan node and the new single-threaded one.

Summary of changes:
- Move code from hdfs-scan-node.h/cc into a new hdfs-scan-node-base.h/cc
- Add new single-threaded scan node in hdfs-scan-node-mt.h/cc
- Both scan nodes inherit from HdfsScanNodeBase
- Rework the allocation of templates tuples such that the memory is drawn
  from a new mem pool in the scanners, and that each scanner clones the
  partition exprs contexts. Before, the memory was taken from the parent
  scan node's mem pool, and there was only one instance of the partition
  exprs contexts. Their access was protected under a lock, however, not
  in all instances, so their use was not always obviously correct.
  The change in this patch makes thread safety obvious and helps move
  a lock into the multi-threaded scan node which would otherwise have
  to remain in the HdfsScanNodeBase class.
- Simplify a couple of loops with C++11 for-each

Testing: A private core/hdfs run passed. I ran TPC-H/DS and test_scanners.py
on ASAN several times locally.

Change-Id: I98cc7f970e1575dd83875609985e1877ada3d5e0
Reviewed-on: http://gerrit.cloudera.org:8080/4113
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Alex Behm <al...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/faebfebd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/faebfebd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/faebfebd

Branch: refs/heads/master
Commit: faebfebdfd227b8f72055e60e13ac8726eafc8fc
Parents: 8c37bf3
Author: Alex Behm <al...@cloudera.com>
Authored: Fri Jul 29 10:24:42 2016 -0700
Committer: Alex Behm <al...@cloudera.com>
Committed: Thu Sep 8 04:34:52 2016 +0000

----------------------------------------------------------------------
 be/src/exec/CMakeLists.txt                      |   2 +
 be/src/exec/base-sequence-scanner.cc            |  31 +-
 be/src/exec/base-sequence-scanner.h             |   4 +-
 be/src/exec/exec-node.cc                        |   6 +
 be/src/exec/hdfs-avro-scanner.cc                |  28 +-
 be/src/exec/hdfs-avro-scanner.h                 |   9 +-
 be/src/exec/hdfs-lzo-text-scanner.cc            |   8 +-
 be/src/exec/hdfs-lzo-text-scanner.h             |  11 +-
 be/src/exec/hdfs-parquet-scanner.cc             |  58 +-
 be/src/exec/hdfs-parquet-scanner.h              |  10 +-
 be/src/exec/hdfs-rcfile-scanner.cc              |   5 +-
 be/src/exec/hdfs-rcfile-scanner.h               |   3 +-
 be/src/exec/hdfs-scan-node-base.cc              | 893 ++++++++++++++++++
 be/src/exec/hdfs-scan-node-base.h               | 459 +++++++++
 be/src/exec/hdfs-scan-node-mt.cc                | 125 +++
 be/src/exec/hdfs-scan-node-mt.h                 |  59 ++
 be/src/exec/hdfs-scan-node.cc                   | 920 +------------------
 be/src/exec/hdfs-scan-node.h                    | 453 +--------
 be/src/exec/hdfs-scanner-ir.cc                  |   2 +-
 be/src/exec/hdfs-scanner.cc                     |  58 +-
 be/src/exec/hdfs-scanner.h                      |  87 +-
 be/src/exec/hdfs-sequence-scanner.cc            |   8 +-
 be/src/exec/hdfs-sequence-scanner.h             |   6 +-
 be/src/exec/hdfs-text-scanner.cc                |  22 +-
 be/src/exec/hdfs-text-scanner.h                 |   8 +-
 be/src/exec/scanner-context.cc                  |   6 +-
 be/src/exec/scanner-context.h                   |  11 +-
 be/src/exprs/expr-context.h                     |   5 +-
 be/src/runtime/tuple.h                          |   5 +-
 common/thrift/ImpalaInternalService.thrift      |   7 +-
 .../com/cloudera/impala/service/Frontend.java   |   2 +-
 31 files changed, 1837 insertions(+), 1474 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 091b9b7..8f12656 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -46,6 +46,8 @@ add_library(Exec
   hbase-table-sink.cc
   hbase-table-writer.cc
   hdfs-scan-node.cc
+  hdfs-scan-node-base.cc
+  hdfs-scan-node-mt.cc
   hdfs-scanner.cc
   hdfs-scanner-ir.cc
   hdfs-table-sink.cc

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/base-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc
index 5429e04..5f6bad6 100644
--- a/be/src/exec/base-sequence-scanner.cc
+++ b/be/src/exec/base-sequence-scanner.cc
@@ -41,7 +41,7 @@ static const int MIN_SYNC_READ_SIZE = 64 * 1024; // bytes
 // Macro to convert between SerdeUtil errors to Status returns.
 #define RETURN_IF_FALSE(x) if (UNLIKELY(!(x))) return parse_status_
 
-Status BaseSequenceScanner::IssueInitialRanges(HdfsScanNode* scan_node,
+Status BaseSequenceScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
     const vector<HdfsFileDesc*>& files) {
   // Issue just the header range for each file.  When the header is complete,
   // we'll issue the splits for that file.  Splits cannot be processed until the
@@ -49,7 +49,7 @@ Status BaseSequenceScanner::IssueInitialRanges(HdfsScanNode* scan_node,
   vector<DiskIoMgr::ScanRange*> header_ranges;
   for (int i = 0; i < files.size(); ++i) {
     ScanRangeMetadata* metadata =
-        reinterpret_cast<ScanRangeMetadata*>(files[i]->splits[0]->meta_data());
+        static_cast<ScanRangeMetadata*>(files[i]->splits[0]->meta_data());
     int64_t header_size = min<int64_t>(HEADER_SIZE, files[i]->file_length);
     // The header is almost always a remote read. Set the disk id to -1 and indicate
     // it is not cached.
@@ -66,9 +66,8 @@ Status BaseSequenceScanner::IssueInitialRanges(HdfsScanNode* scan_node,
   return Status::OK();
 }
 
-BaseSequenceScanner::BaseSequenceScanner(HdfsScanNode* node, RuntimeState* state,
-    bool add_batches_to_queue)
-  : HdfsScanner(node, state, add_batches_to_queue),
+BaseSequenceScanner::BaseSequenceScanner(HdfsScanNodeBase* node, RuntimeState* state)
+  : HdfsScanner(node, state),
     header_(NULL),
     block_start_(0),
     total_block_size_(0),
@@ -110,12 +109,23 @@ void BaseSequenceScanner::Close(RowBatch* row_batch) {
   if (row_batch != NULL) {
     row_batch->tuple_data_pool()->AcquireData(data_buffer_pool_.get(), false);
     context_->ReleaseCompletedResources(row_batch, true);
-    if (add_batches_to_queue_) scan_node_->AddMaterializedRowBatch(row_batch);
+    if (scan_node_->HasRowBatchQueue()) {
+      static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch(row_batch);
+    }
+  }
+  // Transfer template tuple pool to scan node pool. The scanner may be closed and
+  // subsequently re-used for another range, so we need to ensure that the template
+  // tuples are backed by live memory.
+  if (template_tuple_pool_.get() != NULL) {
+    static_cast<HdfsScanNode*>(scan_node_)->TransferToScanNodePool(
+        template_tuple_pool_.get());
   }
+
   // Verify all resources (if any) have been transferred.
+  DCHECK_EQ(template_tuple_pool_.get()->total_allocated_bytes(), 0);
   DCHECK_EQ(data_buffer_pool_.get()->total_allocated_bytes(), 0);
   DCHECK_EQ(context_->num_completed_io_buffers(), 0);
-  // 'header_' can be NULL if HdfsScanNode::CreateAndPrepareScanner() failed.
+  // 'header_' can be NULL if HdfsScanNodeBase::CreateAndOpenScanner() failed.
   if (!only_parsing_header_ && header_ != NULL) {
     scan_node_->RangeComplete(file_format(), header_->compression_type);
   }
@@ -123,9 +133,9 @@ void BaseSequenceScanner::Close(RowBatch* row_batch) {
 }
 
 Status BaseSequenceScanner::ProcessSplit() {
-  DCHECK(add_batches_to_queue_);
+  DCHECK(scan_node_->HasRowBatchQueue());
   header_ = reinterpret_cast<FileHeader*>(
-      scan_node_->GetFileMetadata(stream_->filename()));
+      static_cast<HdfsScanNode*>(scan_node_)->GetFileMetadata(stream_->filename()));
   if (header_ == NULL) {
     // This is the initial scan range just to parse the header
     only_parsing_header_ = true;
@@ -139,7 +149,8 @@ Status BaseSequenceScanner::ProcessSplit() {
     }
 
     // Header is parsed, set the metadata in the scan node and issue more ranges
-    scan_node_->SetFileMetadata(stream_->filename(), header_);
+    static_cast<HdfsScanNode*>(scan_node_)->SetFileMetadata(
+        stream_->filename(), header_);
     HdfsFileDesc* desc = scan_node_->GetFileDesc(stream_->filename());
     scan_node_->AddDiskIoRanges(desc);
     return Status::OK();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/base-sequence-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/base-sequence-scanner.h b/be/src/exec/base-sequence-scanner.h
index 55e3728..a909e00 100644
--- a/be/src/exec/base-sequence-scanner.h
+++ b/be/src/exec/base-sequence-scanner.h
@@ -38,7 +38,7 @@ class ScannerContext;
 class BaseSequenceScanner : public HdfsScanner {
  public:
   /// Issue the initial ranges for all sequence container files.
-  static Status IssueInitialRanges(HdfsScanNode* scan_node,
+  static Status IssueInitialRanges(HdfsScanNodeBase* scan_node,
                                    const std::vector<HdfsFileDesc*>& files);
 
   virtual Status Open(ScannerContext* context);
@@ -102,7 +102,7 @@ class BaseSequenceScanner : public HdfsScanner {
   /// Returns type of scanner: e.g. rcfile, seqfile
   virtual THdfsFileFormat::type file_format() const = 0;
 
-  BaseSequenceScanner(HdfsScanNode*, RuntimeState*, bool);
+  BaseSequenceScanner(HdfsScanNodeBase*, RuntimeState*);
 
   /// Read and validate sync marker against header_->sync.  Returns non-ok if the sync
   /// marker did not match. Scanners should always use this function to read sync markers,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/exec-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index 0996037..59ac997 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -35,6 +35,7 @@
 #include "exec/hash-join-node.h"
 #include "exec/hbase-scan-node.h"
 #include "exec/hdfs-scan-node.h"
+#include "exec/hdfs-scan-node-mt.h"
 #include "exec/kudu-scan-node.h"
 #include "exec/kudu-util.h"
 #include "exec/nested-loop-join-node.h"
@@ -264,6 +265,11 @@ Status ExecNode::CreateNode(ObjectPool* pool, const TPlanNode& tnode,
   switch (tnode.node_type) {
     case TPlanNodeType::HDFS_SCAN_NODE:
       *node = pool->Add(new HdfsScanNode(pool, tnode, descs));
+      if (state->query_options().mt_num_cores > 0) {
+        *node = pool->Add(new HdfsScanNodeMt(pool, tnode, descs));
+      } else {
+        *node = pool->Add(new HdfsScanNode(pool, tnode, descs));
+      }
       // If true, this node requests codegen over interpretation for conjuncts
       // evaluation whenever possible. Turn codegen on for expr evaluation for
       // the entire fragment.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/hdfs-avro-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-scanner.cc b/be/src/exec/hdfs-avro-scanner.cc
index ff278de..f511b12 100644
--- a/be/src/exec/hdfs-avro-scanner.cc
+++ b/be/src/exec/hdfs-avro-scanner.cc
@@ -55,9 +55,8 @@ const string AVRO_MEM_LIMIT_EXCEEDED = "HdfsAvroScanner::$0() failed to allocate
 
 #define RETURN_IF_FALSE(x) if (UNLIKELY(!(x))) return parse_status_
 
-HdfsAvroScanner::HdfsAvroScanner(HdfsScanNode* scan_node, RuntimeState* state,
-    bool add_batches_to_queue)
-  : BaseSequenceScanner(scan_node, state, add_batches_to_queue),
+HdfsAvroScanner::HdfsAvroScanner(HdfsScanNodeBase* scan_node, RuntimeState* state)
+  : BaseSequenceScanner(scan_node, state),
     avro_header_(NULL),
     codegend_decode_avro_data_(NULL) {
 }
@@ -78,7 +77,7 @@ Status HdfsAvroScanner::Open(ScannerContext* context) {
   return Status::OK();
 }
 
-Status HdfsAvroScanner::Codegen(HdfsScanNode* node,
+Status HdfsAvroScanner::Codegen(HdfsScanNodeBase* node,
     const vector<ExprContext*>& conjunct_ctxs, Function** decode_avro_data_fn) {
   *decode_avro_data_fn = NULL;
   if (!node->runtime_state()->codegen_enabled()) {
@@ -291,8 +290,12 @@ Status HdfsAvroScanner::ResolveSchemas(const AvroSchemaElement& table_root,
 Status HdfsAvroScanner::WriteDefaultValue(
     SlotDescriptor* slot_desc, avro_datum_t default_value, const char* field_name) {
   if (avro_header_->template_tuple == NULL) {
-    avro_header_->template_tuple = template_tuple_ != NULL ?
-        template_tuple_ : scan_node_->InitEmptyTemplateTuple(*scan_node_->tuple_desc());
+    if (template_tuple_ != NULL) {
+      avro_header_->template_tuple = template_tuple_;
+    } else {
+      avro_header_->template_tuple =
+          Tuple::Create(tuple_byte_size_, template_tuple_pool_.get());
+    }
   }
   switch (default_value->type) {
     case AVRO_BOOLEAN: {
@@ -336,14 +339,11 @@ Status HdfsAvroScanner::WriteDefaultValue(
     case AVRO_STRING:
     case AVRO_BYTES: {
       RETURN_IF_ERROR(VerifyTypesMatch(slot_desc, default_value));
-      // Mempools aren't thread safe so make a local one and transfer it
-      // to the scan node pool.
-      MemPool pool(scan_node_->mem_tracker());
       char* v;
       if (avro_string_get(default_value, &v)) DCHECK(false);
       StringValue sv(v);
-      RawValue::Write(&sv, avro_header_->template_tuple, slot_desc, &pool);
-      scan_node_->TransferToScanNodePool(&pool);
+      RawValue::Write(&sv, avro_header_->template_tuple, slot_desc,
+          template_tuple_pool_.get());
       break;
     }
     case AVRO_NULL:
@@ -772,8 +772,8 @@ void HdfsAvroScanner::SetStatusValueOverflow(TErrorCode::type error_code, int64_
 // bail_out:           ; preds = %read_field11, %end_field3, %read_field2, %end_field,
 //   ret i1 false      ;         %read_field, %entry
 // }
-Status HdfsAvroScanner::CodegenMaterializeTuple(HdfsScanNode* node, LlvmCodeGen* codegen,
-    Function** materialize_tuple_fn) {
+Status HdfsAvroScanner::CodegenMaterializeTuple(HdfsScanNodeBase* node,
+    LlvmCodeGen* codegen, Function** materialize_tuple_fn) {
   LLVMContext& context = codegen->context();
   LlvmCodeGen::LlvmBuilder builder(context);
 
@@ -839,7 +839,7 @@ Status HdfsAvroScanner::CodegenMaterializeTuple(HdfsScanNode* node, LlvmCodeGen*
 }
 
 Status HdfsAvroScanner::CodegenReadRecord(
-    const SchemaPath& path, const AvroSchemaElement& record, HdfsScanNode* node,
+    const SchemaPath& path, const AvroSchemaElement& record, HdfsScanNodeBase* node,
     LlvmCodeGen* codegen, void* void_builder, Function* fn, BasicBlock* insert_before,
     BasicBlock* bail_out, Value* this_val, Value* pool_val, Value* tuple_val,
     Value* data_val, Value* data_end_val) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/hdfs-avro-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-scanner.h b/be/src/exec/hdfs-avro-scanner.h
index f95cd10..274cf4d 100644
--- a/be/src/exec/hdfs-avro-scanner.h
+++ b/be/src/exec/hdfs-avro-scanner.h
@@ -89,13 +89,14 @@ class HdfsAvroScanner : public BaseSequenceScanner {
   /// Avro file: {'O', 'b', 'j', 1}
   static const uint8_t AVRO_VERSION_HEADER[4];
 
-  HdfsAvroScanner(HdfsScanNode* scan_node, RuntimeState* state, bool add_batches_to_queue);
+  HdfsAvroScanner(HdfsScanNodeBase* scan_node, RuntimeState* state);
 
   virtual Status Open(ScannerContext* context);
 
   /// Codegen DecodeAvroData(). Stores the resulting function in 'decode_avro_data_fn' if
   /// codegen was successful or NULL otherwise.
-  static Status Codegen(HdfsScanNode*, const std::vector<ExprContext*>& conjunct_ctxs,
+  static Status Codegen(HdfsScanNodeBase* node,
+      const std::vector<ExprContext*>& conjunct_ctxs,
       llvm::Function** decode_avro_data_fn);
 
  protected:
@@ -205,7 +206,7 @@ class HdfsAvroScanner : public BaseSequenceScanner {
   /// schema. Stores the resulting function in 'materialize_tuple_fn' if codegen was
   /// successful or returns an error.
   /// TODO: Codegen a function for each unique file schema.
-  static Status CodegenMaterializeTuple(HdfsScanNode* node, LlvmCodeGen* codegen,
+  static Status CodegenMaterializeTuple(HdfsScanNodeBase* node, LlvmCodeGen* codegen,
       llvm::Function** materialize_tuple_fn);
 
   /// Used by CodegenMaterializeTuple to recursively create the IR for reading an Avro
@@ -223,7 +224,7 @@ class HdfsAvroScanner : public BaseSequenceScanner {
   /// - this_val, pool_val, tuple_val, data_val, data_end_val: arguments to
   ///     MaterializeTuple()
   static Status CodegenReadRecord(
-      const SchemaPath& path, const AvroSchemaElement& record, HdfsScanNode* node,
+      const SchemaPath& path, const AvroSchemaElement& record, HdfsScanNodeBase* node,
       LlvmCodeGen* codegen, void* builder, llvm::Function* fn,
       llvm::BasicBlock* insert_before, llvm::BasicBlock* bail_out, llvm::Value* this_val,
       llvm::Value* pool_val, llvm::Value* tuple_val, llvm::Value* data_val,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/hdfs-lzo-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-lzo-text-scanner.cc b/be/src/exec/hdfs-lzo-text-scanner.cc
index b5f0525..88ae295 100644
--- a/be/src/exec/hdfs-lzo-text-scanner.cc
+++ b/be/src/exec/hdfs-lzo-text-scanner.cc
@@ -45,13 +45,13 @@ SpinLock HdfsLzoTextScanner::lzo_load_lock_;
 const char* (*GetImpalaLzoBuildVersion)();
 
 HdfsScanner* (*HdfsLzoTextScanner::CreateLzoTextScanner)(
-    HdfsScanNode* scan_node, RuntimeState* state);
+    HdfsScanNodeBase* scan_node, RuntimeState* state);
 
 Status (*HdfsLzoTextScanner::LzoIssueInitialRanges)(
-    HdfsScanNode* scan_node, const std::vector<HdfsFileDesc*>& files);
+    HdfsScanNodeBase* scan_node, const std::vector<HdfsFileDesc*>& files);
 
 HdfsScanner* HdfsLzoTextScanner::GetHdfsLzoTextScanner(
-    HdfsScanNode* scan_node, RuntimeState* state) {
+    HdfsScanNodeBase* scan_node, RuntimeState* state) {
 
   // If the scanner was not loaded then no scans could be issued so we should
   // never get here without having loaded the scanner.
@@ -60,7 +60,7 @@ HdfsScanner* HdfsLzoTextScanner::GetHdfsLzoTextScanner(
   return (*CreateLzoTextScanner)(scan_node, state);
 }
 
-Status HdfsLzoTextScanner::IssueInitialRanges(HdfsScanNode* scan_node,
+Status HdfsLzoTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
     const vector<HdfsFileDesc*>& files) {
   if (LzoIssueInitialRanges == NULL) {
     lock_guard<SpinLock> l(lzo_load_lock_);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/hdfs-lzo-text-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-lzo-text-scanner.h b/be/src/exec/hdfs-lzo-text-scanner.h
index 0c75f97..d6bddf9 100644
--- a/be/src/exec/hdfs-lzo-text-scanner.h
+++ b/be/src/exec/hdfs-lzo-text-scanner.h
@@ -21,7 +21,7 @@
 #include "common/status.h"
 #include "exec/scan-node.h"
 #include "exec/hdfs-scanner.h"
-#include "exec/hdfs-scan-node.h"
+#include "exec/hdfs-scan-node-base.h"
 #include "util/spinlock.h"
 
 namespace impala {
@@ -33,8 +33,9 @@ namespace impala {
 /// GetHdfsLzoTextScanner -- returns a pointer to the Scanner object.
 class HdfsLzoTextScanner {
  public:
-  static HdfsScanner* GetHdfsLzoTextScanner(HdfsScanNode* scan_node, RuntimeState* state);
-  static Status IssueInitialRanges(HdfsScanNode* scan_node,
+  static HdfsScanner* GetHdfsLzoTextScanner(HdfsScanNodeBase* scan_node,
+      RuntimeState* state);
+  static Status IssueInitialRanges(HdfsScanNodeBase* scan_node,
                                    const std::vector<HdfsFileDesc*>& files);
 
  private:
@@ -49,11 +50,11 @@ class HdfsLzoTextScanner {
 
   /// Dynamically linked function to create the Lzo Scanner Object.
   static HdfsScanner* (*CreateLzoTextScanner)
-      (HdfsScanNode* scan_node, RuntimeState* state);
+      (HdfsScanNodeBase* scan_node, RuntimeState* state);
 
   /// Dynamically linked function to set the initial scan ranges.
   static Status (*LzoIssueInitialRanges)(
-      HdfsScanNode* scan_node, const std::vector<HdfsFileDesc*>& files);
+      HdfsScanNodeBase* scan_node, const std::vector<HdfsFileDesc*>& files);
 
   /// Dynamically loads CreateLzoTextScanner and LzoIssueInitialRanges.
   /// lzo_load_lock_ should be taken before calling this method.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 30c07a4..a4f1823 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -71,7 +71,7 @@ const int16_t HdfsParquetScanner::ROW_GROUP_END;
 const int16_t HdfsParquetScanner::INVALID_LEVEL;
 const int16_t HdfsParquetScanner::INVALID_POS;
 
-Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNode* scan_node,
+Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
     const std::vector<HdfsFileDesc*>& files) {
   vector<DiskIoMgr::ScanRange*> footer_ranges;
   for (int i = 0; i < files.size(); ++i) {
@@ -99,7 +99,7 @@ Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNode* scan_node,
       // footer range for the split always.
       if (!scan_node->IsZeroSlotTableScan() || footer_split == split) {
         ScanRangeMetadata* split_metadata =
-            reinterpret_cast<ScanRangeMetadata*>(split->meta_data());
+            static_cast<ScanRangeMetadata*>(split->meta_data());
         // Each split is processed by first issuing a scan range for the file footer, which
         // is done here, followed by scan ranges for the columns of each row group within
         // the actual split (in InitColumns()). The original split is stored in the
@@ -141,9 +141,8 @@ DiskIoMgr::ScanRange* HdfsParquetScanner::FindFooterSplit(HdfsFileDesc* file) {
 
 namespace impala {
 
-HdfsParquetScanner::HdfsParquetScanner(HdfsScanNode* scan_node, RuntimeState* state,
-    bool add_batches_to_queue)
-    : HdfsScanner(scan_node, state, add_batches_to_queue),
+HdfsParquetScanner::HdfsParquetScanner(HdfsScanNodeBase* scan_node, RuntimeState* state)
+    : HdfsScanner(scan_node, state),
       row_group_idx_(-1),
       row_group_rows_read_(0),
       advance_row_group_(true),
@@ -215,14 +214,22 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
 void HdfsParquetScanner::Close(RowBatch* row_batch) {
   if (row_batch != NULL) {
     FlushRowGroupResources(row_batch);
-    if (add_batches_to_queue_) scan_node_->AddMaterializedRowBatch(row_batch);
-  } else if (!FLAGS_enable_partitioned_hash_join ||
-      !FLAGS_enable_partitioned_aggregation) {
-    // With the legacy aggs/joins the tuple ptrs of the scratch batch are allocated
-    // from the scratch batch's mem pool. We can get into this case if Open() fails.
-    scratch_batch_->mem_pool()->FreeAll();
+    row_batch->tuple_data_pool()->AcquireData(template_tuple_pool_.get(), false);
+    if (scan_node_->HasRowBatchQueue()) {
+      static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch(row_batch);
+    }
+  } else {
+    if (template_tuple_pool_.get() != NULL) template_tuple_pool_->FreeAll();
+    if (!FLAGS_enable_partitioned_hash_join ||
+        !FLAGS_enable_partitioned_aggregation) {
+      // With the legacy aggs/joins the tuple ptrs of the scratch batch are allocated
+      // from the scratch batch's mem pool. We can get into this case if Open() fails.
+      scratch_batch_->mem_pool()->FreeAll();
+    }
   }
+
   // Verify all resources (if any) have been transferred.
+  DCHECK_EQ(template_tuple_pool_.get()->total_allocated_bytes(), 0);
   DCHECK_EQ(dictionary_pool_.get()->total_allocated_bytes(), 0);
   DCHECK_EQ(scratch_batch_->mem_pool()->total_allocated_bytes(), 0);
   DCHECK_EQ(context_->num_completed_io_buffers(), 0);
@@ -307,25 +314,25 @@ int HdfsParquetScanner::CountScalarColumns(const vector<ParquetColumnReader*>& c
 }
 
 Status HdfsParquetScanner::ProcessSplit() {
-  DCHECK(add_batches_to_queue_);
-  bool scanner_eos = false;
+  DCHECK(scan_node_->HasRowBatchQueue());
+  HdfsScanNode* scan_node = static_cast<HdfsScanNode*>(scan_node_);
   do {
     StartNewParquetRowBatch();
-    RETURN_IF_ERROR(GetNextInternal(batch_, &scanner_eos));
-    scan_node_->AddMaterializedRowBatch(batch_);
-  } while (!scanner_eos && !scan_node_->ReachedLimit());
+    RETURN_IF_ERROR(GetNextInternal(batch_));
+    scan_node->AddMaterializedRowBatch(batch_);
+  } while (!eos_ && !scan_node_->ReachedLimit());
 
   // Transfer the remaining resources to this new batch in Close().
   StartNewParquetRowBatch();
   return Status::OK();
 }
 
-Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch, bool* eos) {
+Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) {
   if (scan_node_->IsZeroSlotTableScan()) {
     // There are no materialized slots, e.g. count(*) over the table.  We can serve
     // this query from just the file metadata. We don't need to read the column data.
     if (row_group_rows_read_ == file_metadata_.num_rows) {
-      *eos = true;
+      eos_ = true;
       return Status::OK();
     }
     assemble_rows_timer_.Start();
@@ -363,7 +370,7 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch, bool* eos) {
     }
     RETURN_IF_ERROR(NextRowGroup());
     if (row_group_idx_ >= file_metadata_.row_groups.size()) {
-      *eos = true;
+      eos_ = true;
       DCHECK(parse_status_.ok());
       return Status::OK();
     }
@@ -374,7 +381,7 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch, bool* eos) {
   // scan range.
   if (!scan_node_->PartitionPassesFilters(context_->partition_descriptor()->id(),
       FilterStats::ROW_GROUPS_KEY, context_->filter_ctxs())) {
-    *eos = true;
+    eos_ = true;
     DCHECK(parse_status_.ok());
     return Status::OK();
   }
@@ -406,7 +413,7 @@ Status HdfsParquetScanner::NextRowGroup() {
     const parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_];
     if (row_group.num_rows == 0) continue;
 
-    const DiskIoMgr::ScanRange* split_range = reinterpret_cast<ScanRangeMetadata*>(
+    const DiskIoMgr::ScanRange* split_range = static_cast<ScanRangeMetadata*>(
         metadata_range_->meta_data())->original_split;
     HdfsFileDesc* file_desc = scan_node_->GetFileDesc(filename());
     RETURN_IF_ERROR(ParquetMetadataUtils::ValidateColumnOffsets(
@@ -539,7 +546,7 @@ Status HdfsParquetScanner::AssembleRows(
 }
 
 void HdfsParquetScanner::StartNewParquetRowBatch() {
-  DCHECK(add_batches_to_queue_);
+  DCHECK(scan_node_->HasRowBatchQueue());
   batch_ = new RowBatch(scan_node_->row_desc(), state_->batch_size(),
       scan_node_->mem_tracker());
 }
@@ -609,7 +616,7 @@ int HdfsParquetScanner::TransferScratchTuples(RowBatch* dst_batch) {
   return num_row_to_commit;
 }
 
-Status HdfsParquetScanner::Codegen(HdfsScanNode* node,
+Status HdfsParquetScanner::Codegen(HdfsScanNodeBase* node,
     const vector<ExprContext*>& conjunct_ctxs, Function** process_scratch_batch_fn) {
   *process_scratch_batch_fn = NULL;
   if (!node->runtime_state()->codegen_enabled()) {
@@ -929,7 +936,8 @@ Status HdfsParquetScanner::CreateColumnReaders(const TupleDescriptor& tuple_desc
       // Update the template tuple to put a NULL in this slot.
       Tuple** template_tuple = &template_tuple_map_[&tuple_desc];
       if (*template_tuple == NULL) {
-        *template_tuple = scan_node_->InitEmptyTemplateTuple(tuple_desc);
+        *template_tuple =
+            Tuple::Create(tuple_desc.byte_size(), template_tuple_pool_.get());
       }
       (*template_tuple)->SetNull(slot_desc->null_indicator_offset());
       continue;
@@ -1110,7 +1118,7 @@ Status HdfsParquetScanner::InitColumns(
     }
 
     const DiskIoMgr::ScanRange* split_range =
-        reinterpret_cast<ScanRangeMetadata*>(metadata_range_->meta_data())->original_split;
+        static_cast<ScanRangeMetadata*>(metadata_range_->meta_data())->original_split;
 
     // Determine if the column is completely contained within a local split.
     bool column_range_local = split_range->expected_local() &&

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h
index 425444f..7707083 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -320,13 +320,12 @@ class BoolColumnReader;
 /// the ScannerContext.
 class HdfsParquetScanner : public HdfsScanner {
  public:
-  HdfsParquetScanner(HdfsScanNode* scan_node, RuntimeState* state,
-      bool add_batches_to_queue);
+  HdfsParquetScanner(HdfsScanNodeBase* scan_node, RuntimeState* state);
   virtual ~HdfsParquetScanner() {};
 
   /// Issue just the footer range for each file.  We'll then parse the footer and pick
   /// out the columns we want.
-  static Status IssueInitialRanges(HdfsScanNode* scan_node,
+  static Status IssueInitialRanges(HdfsScanNodeBase* scan_node,
                                    const std::vector<HdfsFileDesc*>& files);
 
   virtual Status Open(ScannerContext* context);
@@ -335,7 +334,8 @@ class HdfsParquetScanner : public HdfsScanner {
 
   /// Codegen ProcessScratchBatch(). Stores the resulting function in
   /// 'process_scratch_batch_fn' if codegen was successful or NULL otherwise.
-  static Status Codegen(HdfsScanNode*, const std::vector<ExprContext*>& conjunct_ctxs,
+  static Status Codegen(HdfsScanNodeBase* node,
+      const std::vector<ExprContext*>& conjunct_ctxs,
       llvm::Function** process_scratch_batch_fn);
 
   /// The repetition level is set to this value to indicate the end of a row group.
@@ -438,7 +438,7 @@ class HdfsParquetScanner : public HdfsScanner {
 
   const char* filename() const { return metadata_range_->file(); }
 
-  virtual Status GetNextInternal(RowBatch* row_batch, bool* eos);
+  virtual Status GetNextInternal(RowBatch* row_batch);
 
   /// Advances 'row_group_idx_' to the next non-empty row group and initializes
   /// the column readers to scan it. Recoverable errors are logged to the runtime

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/hdfs-rcfile-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-rcfile-scanner.cc b/be/src/exec/hdfs-rcfile-scanner.cc
index c670d28..f43b2aa 100644
--- a/be/src/exec/hdfs-rcfile-scanner.cc
+++ b/be/src/exec/hdfs-rcfile-scanner.cc
@@ -53,9 +53,8 @@ const uint8_t HdfsRCFileScanner::RCFILE_VERSION_HEADER[4] = {'R', 'C', 'F', 1};
 // Macro to convert between SerdeUtil errors to Status returns.
 #define RETURN_IF_FALSE(x) if (UNLIKELY(!(x))) return parse_status_
 
-HdfsRCFileScanner::HdfsRCFileScanner(HdfsScanNode* scan_node, RuntimeState* state,
-    bool add_batches_to_queue)
-    : BaseSequenceScanner(scan_node, state, add_batches_to_queue) {
+HdfsRCFileScanner::HdfsRCFileScanner(HdfsScanNodeBase* scan_node, RuntimeState* state)
+    : BaseSequenceScanner(scan_node, state) {
 }
 
 HdfsRCFileScanner::~HdfsRCFileScanner() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/hdfs-rcfile-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-rcfile-scanner.h b/be/src/exec/hdfs-rcfile-scanner.h
index fe83648..4e18bc4 100644
--- a/be/src/exec/hdfs-rcfile-scanner.h
+++ b/be/src/exec/hdfs-rcfile-scanner.h
@@ -233,8 +233,7 @@ class Tuple;
 /// A scanner for reading RCFiles into tuples.
 class HdfsRCFileScanner : public BaseSequenceScanner {
  public:
-  HdfsRCFileScanner(HdfsScanNode* scan_node, RuntimeState* state,
-      bool add_batches_to_queue);
+  HdfsRCFileScanner(HdfsScanNodeBase* scan_node, RuntimeState* state);
   virtual ~HdfsRCFileScanner();
 
   virtual Status Open(ScannerContext* context);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
new file mode 100644
index 0000000..dba67b5
--- /dev/null
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -0,0 +1,893 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/hdfs-scan-node-base.h"
+#include "exec/base-sequence-scanner.h"
+#include "exec/hdfs-text-scanner.h"
+#include "exec/hdfs-lzo-text-scanner.h"
+#include "exec/hdfs-sequence-scanner.h"
+#include "exec/hdfs-rcfile-scanner.h"
+#include "exec/hdfs-avro-scanner.h"
+#include "exec/hdfs-parquet-scanner.h"
+
+#include <sstream>
+#include <avro/errors.h>
+#include <avro/schema.h>
+#include <boost/filesystem.hpp>
+#include <gutil/strings/substitute.h>
+
+#include "codegen/llvm-codegen.h"
+#include "common/logging.h"
+#include "common/object-pool.h"
+#include "exprs/expr-context.h"
+#include "runtime/descriptors.h"
+#include "runtime/hdfs-fs-cache.h"
+#include "runtime/runtime-filter.inline.h"
+#include "runtime/runtime-state.h"
+#include "runtime/mem-pool.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/raw-value.h"
+#include "runtime/row-batch.h"
+#include "runtime/string-buffer.h"
+#include "scheduling/query-resource-mgr.h"
+#include "util/bit-util.h"
+#include "util/container-util.h"
+#include "util/debug-util.h"
+#include "util/disk-info.h"
+#include "util/error-util.h"
+#include "util/hdfs-util.h"
+#include "util/impalad-metrics.h"
+#include "util/periodic-counter-updater.h"
+#include "util/runtime-profile-counters.h"
+
+#include "gen-cpp/PlanNodes_types.h"
+
+#include "common/names.h"
+
+DEFINE_int32(runtime_filter_wait_time_ms, 1000, "(Advanced) the maximum time, in ms, "
+    "that a scan node will wait for expected runtime filters to arrive.");
+DEFINE_bool(suppress_unknown_disk_id_warnings, false,
+    "Suppress unknown disk id warnings generated when the HDFS implementation does not"
+    " provide volume/disk information.");
+
+#ifndef NDEBUG
+DECLARE_bool(skip_file_runtime_filtering);
+#endif
+
+namespace filesystem = boost::filesystem;
+using namespace impala;
+using namespace llvm;
+using namespace strings;
+using boost::algorithm::join;
+
+const string HdfsScanNodeBase::HDFS_SPLIT_STATS_DESC =
+    "Hdfs split stats (<volume id>:<# splits>/<split lengths>)";
+
+// Determines how many unexpected remote bytes trigger an error in the runtime state
+const int UNEXPECTED_REMOTE_BYTES_WARN_THRESHOLD = 64 * 1024 * 1024;
+
+HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const TPlanNode& tnode,
+                           const DescriptorTbl& descs)
+    : ScanNode(pool, tnode, descs),
+      runtime_state_(NULL),
+      skip_header_line_count_(tnode.hdfs_scan_node.__isset.skip_header_line_count ?
+          tnode.hdfs_scan_node.skip_header_line_count : 0),
+      tuple_id_(tnode.hdfs_scan_node.tuple_id),
+      reader_context_(NULL),
+      tuple_desc_(NULL),
+      hdfs_table_(NULL),
+      unknown_disk_id_warned_(false),
+      initial_ranges_issued_(false),
+      counters_running_(false),
+      max_compressed_text_file_length_(NULL),
+      disks_accessed_bitmap_(TUnit::UNIT, 0),
+      bytes_read_local_(NULL),
+      bytes_read_short_circuit_(NULL),
+      bytes_read_dn_cache_(NULL),
+      num_remote_ranges_(NULL),
+      unexpected_remote_bytes_(NULL) {
+}
+
+HdfsScanNodeBase::~HdfsScanNodeBase() {
+}
+
+Status HdfsScanNodeBase::Init(const TPlanNode& tnode, RuntimeState* state) {
+  RETURN_IF_ERROR(ExecNode::Init(tnode, state));
+
+  // Add collection item conjuncts
+  for (const auto& entry: tnode.hdfs_scan_node.collection_conjuncts) {
+    DCHECK(conjuncts_map_[entry.first].empty());
+    RETURN_IF_ERROR(
+        Expr::CreateExprTrees(pool_, entry.second, &conjuncts_map_[entry.first]));
+  }
+
+  const TQueryOptions& query_options = state->query_options();
+  for (const TRuntimeFilterDesc& filter: tnode.runtime_filters) {
+    auto it = filter.planid_to_target_ndx.find(tnode.node_id);
+    DCHECK(it != filter.planid_to_target_ndx.end());
+    const TRuntimeFilterTargetDesc& target = filter.targets[it->second];
+    if (state->query_options().runtime_filter_mode == TRuntimeFilterMode::LOCAL &&
+        !target.is_local_target) {
+      continue;
+    }
+    if (query_options.disable_row_runtime_filtering &&
+        !target.is_bound_by_partition_columns) {
+      continue;
+    }
+
+    FilterContext filter_ctx;
+    RETURN_IF_ERROR(Expr::CreateExprTree(pool_, target.target_expr, &filter_ctx.expr));
+    filter_ctx.filter = state->filter_bank()->RegisterFilter(filter, false);
+
+    string filter_profile_title = Substitute("Filter $0 ($1)", filter.filter_id,
+        PrettyPrinter::Print(filter_ctx.filter->filter_size(), TUnit::BYTES));
+    RuntimeProfile* profile = state->obj_pool()->Add(
+        new RuntimeProfile(state->obj_pool(), filter_profile_title));
+    runtime_profile_->AddChild(profile);
+    filter_ctx.stats = state->obj_pool()->Add(new FilterStats(profile,
+        target.is_bound_by_partition_columns));
+
+    filter_ctxs_.push_back(filter_ctx);
+  }
+
+  // Add row batch conjuncts
+  DCHECK(conjuncts_map_[tuple_id_].empty());
+  conjuncts_map_[tuple_id_] = conjunct_ctxs_;
+
+  return Status::OK();
+}
+
+/// TODO: Break up this very long function.
+Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
+  SCOPED_TIMER(runtime_profile_->total_time_counter());
+  runtime_state_ = state;
+  RETURN_IF_ERROR(ScanNode::Prepare(state));
+
+  tuple_desc_ = state->desc_tbl().GetTupleDescriptor(tuple_id_);
+  DCHECK(tuple_desc_ != NULL);
+
+  // Prepare collection conjuncts
+  for (const auto& entry: conjuncts_map_) {
+    TupleDescriptor* tuple_desc = state->desc_tbl().GetTupleDescriptor(entry.first);
+    // conjuncts_ are already prepared in ExecNode::Prepare(), don't try to prepare again
+    if (tuple_desc == tuple_desc_) continue;
+    RowDescriptor* collection_row_desc =
+        state->obj_pool()->Add(new RowDescriptor(tuple_desc, /* is_nullable */ false));
+    RETURN_IF_ERROR(
+        Expr::Prepare(entry.second, state, *collection_row_desc, expr_mem_tracker()));
+  }
+
+  // One-time initialisation of state that is constant across scan ranges
+  DCHECK(tuple_desc_->table_desc() != NULL);
+  hdfs_table_ = static_cast<const HdfsTableDescriptor*>(tuple_desc_->table_desc());
+  scan_node_pool_.reset(new MemPool(mem_tracker()));
+
+  for (FilterContext& filter: filter_ctxs_) {
+    RETURN_IF_ERROR(filter.expr->Prepare(state, row_desc(), expr_mem_tracker()));
+    AddExprCtxToFree(filter.expr);
+  }
+
+  // Parse Avro table schema if applicable
+  const string& avro_schema_str = hdfs_table_->avro_schema();
+  if (!avro_schema_str.empty()) {
+    avro_schema_t avro_schema;
+    int error = avro_schema_from_json_length(
+        avro_schema_str.c_str(), avro_schema_str.size(), &avro_schema);
+    if (error != 0) {
+      return Status(Substitute("Failed to parse table schema: $0", avro_strerror()));
+    }
+    RETURN_IF_ERROR(AvroSchemaElement::ConvertSchema(avro_schema, avro_schema_.get()));
+  }
+
+  // Gather materialized partition-key slots and non-partition slots.
+  const vector<SlotDescriptor*>& slots = tuple_desc_->slots();
+  for (size_t i = 0; i < slots.size(); ++i) {
+    if (hdfs_table_->IsClusteringCol(slots[i])) {
+      partition_key_slots_.push_back(slots[i]);
+    } else {
+      materialized_slots_.push_back(slots[i]);
+    }
+  }
+
+  // Order the materialized slots such that for schemaless file formats (e.g. text) the
+  // order corresponds to the physical order in files. For formats where the file schema
+  // is independent of the table schema (e.g. Avro, Parquet), this step is not necessary.
+  sort(materialized_slots_.begin(), materialized_slots_.end(),
+      SlotDescriptor::ColPathLessThan);
+
+  // Populate mapping from slot path to index into materialized_slots_.
+  for (int i = 0; i < materialized_slots_.size(); ++i) {
+    path_to_materialized_slot_idx_[materialized_slots_[i]->col_path()] = i;
+  }
+
+  // Initialize is_materialized_col_
+  is_materialized_col_.resize(hdfs_table_->num_cols());
+  for (int i = 0; i < hdfs_table_->num_cols(); ++i) {
+    is_materialized_col_[i] = GetMaterializedSlotIdx(vector<int>(1, i)) != SKIP_COLUMN;
+  }
+
+  HdfsFsCache::HdfsFsMap fs_cache;
+  // Convert the TScanRangeParams into per-file DiskIO::ScanRange objects and populate
+  // partition_ids_, file_descs_, and per_type_files_.
+  DCHECK(scan_range_params_ != NULL)
+      << "Must call SetScanRanges() before calling Prepare()";
+  int num_ranges_missing_volume_id = 0;
+  for (const TScanRangeParams& params: *scan_range_params_) {
+    DCHECK(params.scan_range.__isset.hdfs_file_split);
+    const THdfsFileSplit& split = params.scan_range.hdfs_file_split;
+    partition_ids_.insert(split.partition_id);
+    HdfsPartitionDescriptor* partition_desc =
+        hdfs_table_->GetPartition(split.partition_id);
+    if (partition_desc == NULL) {
+      // TODO: this should be a DCHECK but we sometimes hit it. It's likely IMPALA-1702.
+      LOG(ERROR) << "Bad table descriptor! table_id=" << hdfs_table_->id()
+                 << " partition_id=" << split.partition_id
+                 << "\n" << PrintThrift(state->fragment_params());
+      return Status("Query encountered invalid metadata, likely due to IMPALA-1702."
+                    " Try rerunning the query.");
+    }
+
+    filesystem::path file_path(partition_desc->location());
+    file_path.append(split.file_name, filesystem::path::codecvt());
+    const string& native_file_path = file_path.native();
+
+    HdfsFileDesc* file_desc = NULL;
+    FileDescMap::iterator file_desc_it = file_descs_.find(native_file_path);
+    if (file_desc_it == file_descs_.end()) {
+      // Add new file_desc to file_descs_ and per_type_files_
+      file_desc = runtime_state_->obj_pool()->Add(new HdfsFileDesc(native_file_path));
+      file_descs_[native_file_path] = file_desc;
+      file_desc->file_length = split.file_length;
+      file_desc->mtime = split.mtime;
+      file_desc->file_compression = split.file_compression;
+      RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
+          native_file_path, &file_desc->fs, &fs_cache));
+      num_unqueued_files_.Add(1);
+      per_type_files_[partition_desc->file_format()].push_back(file_desc);
+    } else {
+      // File already processed
+      file_desc = file_desc_it->second;
+    }
+
+    bool expected_local = params.__isset.is_remote && !params.is_remote;
+    if (expected_local && params.volume_id == -1) {
+      if (!FLAGS_suppress_unknown_disk_id_warnings && !unknown_disk_id_warned_) {
+        runtime_profile()->AppendExecOption("Missing Volume Id");
+        runtime_state()->LogError(ErrorMsg(TErrorCode::HDFS_SCAN_NODE_UNKNOWN_DISK));
+        unknown_disk_id_warned_ = true;
+      }
+      ++num_ranges_missing_volume_id;
+    }
+
+    bool try_cache = params.is_cached;
+    if (runtime_state_->query_options().disable_cached_reads) {
+      DCHECK(!try_cache) << "Params should not have had this set.";
+    }
+    file_desc->splits.push_back(
+        AllocateScanRange(file_desc->fs, file_desc->filename.c_str(), split.length,
+            split.offset, split.partition_id, params.volume_id,
+            try_cache, expected_local, file_desc->mtime));
+  }
+
+  // Prepare all the partitions scanned by the scan node
+  for (int64_t partition_id: partition_ids_) {
+    HdfsPartitionDescriptor* partition_desc = hdfs_table_->GetPartition(partition_id);
+    // This is IMPALA-1702, but will have been caught earlier in this method.
+    DCHECK(partition_desc != NULL) << "table_id=" << hdfs_table_->id()
+                                   << " partition_id=" << partition_id
+                                   << "\n" << PrintThrift(state->fragment_params());
+    RETURN_IF_ERROR(partition_desc->PrepareExprs(state));
+  }
+
+  // Update server wide metrics for number of scan ranges and ranges that have
+  // incomplete metadata.
+  ImpaladMetrics::NUM_RANGES_PROCESSED->Increment(scan_range_params_->size());
+  ImpaladMetrics::NUM_RANGES_MISSING_VOLUME_ID->Increment(num_ranges_missing_volume_id);
+
+  // Add per volume stats to the runtime profile
+  PerVolumnStats per_volume_stats;
+  stringstream str;
+  UpdateHdfsSplitStats(*scan_range_params_, &per_volume_stats);
+  PrintHdfsSplitStats(per_volume_stats, &str);
+  runtime_profile()->AddInfoString(HDFS_SPLIT_STATS_DESC, str.str());
+
+  // Create codegen'd functions
+  for (int format = THdfsFileFormat::TEXT;
+       format <= THdfsFileFormat::PARQUET; ++format) {
+    vector<HdfsFileDesc*>& file_descs =
+        per_type_files_[static_cast<THdfsFileFormat::type>(format)];
+
+    if (file_descs.empty()) continue;
+
+    // Randomize the order this node processes the files. We want to do this to avoid
+    // issuing remote reads to the same DN from different impalads. In file formats such
+    // as avro/seq/rc (i.e. splittable with a header), every node first reads the header.
+    // If every node goes through the files in the same order, all the remote reads are
+    // for the same file meaning a few DN serves a lot of remote reads at the same time.
+    random_shuffle(file_descs.begin(), file_descs.end());
+
+    // Create reusable codegen'd functions for each file type type needed
+    // TODO: do this for conjuncts_map_
+    Function* fn;
+    Status status;
+    switch (format) {
+      case THdfsFileFormat::TEXT:
+        status = HdfsTextScanner::Codegen(this, conjunct_ctxs_, &fn);
+        break;
+      case THdfsFileFormat::SEQUENCE_FILE:
+        status = HdfsSequenceScanner::Codegen(this, conjunct_ctxs_, &fn);
+        break;
+      case THdfsFileFormat::AVRO:
+        status = HdfsAvroScanner::Codegen(this, conjunct_ctxs_, &fn);
+        break;
+      case THdfsFileFormat::PARQUET:
+        status = HdfsParquetScanner::Codegen(this, conjunct_ctxs_, &fn);
+        break;
+      default:
+        // No codegen for this format
+        fn = NULL;
+        status = Status("Not implemented for this format.");
+    }
+    DCHECK(fn != NULL || !status.ok());
+
+    const char* format_name = _THdfsFileFormat_VALUES_TO_NAMES.find(format)->second;
+    if (!status.ok()) {
+      runtime_profile()->AddCodegenMsg(false, status, format_name);
+    } else {
+      runtime_profile()->AddCodegenMsg(true, status, format_name);
+      LlvmCodeGen* codegen;
+      RETURN_IF_ERROR(runtime_state_->GetCodegen(&codegen));
+      codegen->AddFunctionToJit(
+          fn, &codegend_fn_map_[static_cast<THdfsFileFormat::type>(format)]);
+    }
+  }
+
+  return Status::OK();
+}
+
+Status HdfsScanNodeBase::Open(RuntimeState* state) {
+  RETURN_IF_ERROR(ExecNode::Open(state));
+
+  if (file_descs_.empty()) return Status::OK();
+
+  // Open collection conjuncts
+  for (const auto& entry: conjuncts_map_) {
+    // conjuncts_ are already opened in ExecNode::Open()
+    if (entry.first == tuple_id_) continue;
+    RETURN_IF_ERROR(Expr::Open(entry.second, state));
+  }
+
+  for (FilterContext& filter: filter_ctxs_) RETURN_IF_ERROR(filter.expr->Open(state));
+
+  // Open all the partition exprs used by the scan node and create template tuples.
+  for (int64_t partition_id: partition_ids_) {
+    HdfsPartitionDescriptor* partition_desc = hdfs_table_->GetPartition(partition_id);
+    DCHECK(partition_desc != NULL) << "table_id=" << hdfs_table_->id()
+                                   << " partition_id=" << partition_id
+                                   << "\n" << PrintThrift(state->fragment_params());
+    RETURN_IF_ERROR(partition_desc->OpenExprs(state));
+    vector<ExprContext*> partition_key_value_ctxs;
+    RETURN_IF_ERROR(Expr::CloneIfNotExists(
+        partition_desc->partition_key_value_ctxs(), state, &partition_key_value_ctxs));
+    partition_template_tuple_map_[partition_id] =
+        InitTemplateTuple(partition_key_value_ctxs, scan_node_pool_.get(), state);
+    Expr::Close(partition_key_value_ctxs, state);
+  }
+
+  RETURN_IF_ERROR(runtime_state_->io_mgr()->RegisterContext(
+      &reader_context_, mem_tracker()));
+
+  // Initialize HdfsScanNode specific counters
+  // TODO: Revisit counters and move the counters specific to multi-threaded scans
+  // into HdfsScanNode.
+  read_timer_ = ADD_TIMER(runtime_profile(), TOTAL_HDFS_READ_TIMER);
+  per_read_thread_throughput_counter_ = runtime_profile()->AddDerivedCounter(
+      PER_READ_THREAD_THROUGHPUT_COUNTER, TUnit::BYTES_PER_SECOND,
+      bind<int64_t>(&RuntimeProfile::UnitsPerSecond, bytes_read_counter_, read_timer_));
+  scan_ranges_complete_counter_ =
+      ADD_COUNTER(runtime_profile(), SCAN_RANGES_COMPLETE_COUNTER, TUnit::UNIT);
+  if (DiskInfo::num_disks() < 64) {
+    num_disks_accessed_counter_ =
+        ADD_COUNTER(runtime_profile(), NUM_DISKS_ACCESSED_COUNTER, TUnit::UNIT);
+  } else {
+    num_disks_accessed_counter_ = NULL;
+  }
+  num_scanner_threads_started_counter_ =
+      ADD_COUNTER(runtime_profile(), NUM_SCANNER_THREADS_STARTED, TUnit::UNIT);
+
+  runtime_state_->io_mgr()->set_bytes_read_counter(reader_context_, bytes_read_counter());
+  runtime_state_->io_mgr()->set_read_timer(reader_context_, read_timer());
+  runtime_state_->io_mgr()->set_active_read_thread_counter(reader_context_,
+      &active_hdfs_read_thread_counter_);
+  runtime_state_->io_mgr()->set_disks_access_bitmap(reader_context_,
+      &disks_accessed_bitmap_);
+
+  average_scanner_thread_concurrency_ = runtime_profile()->AddSamplingCounter(
+      AVERAGE_SCANNER_THREAD_CONCURRENCY, &active_scanner_thread_counter_);
+  average_hdfs_read_thread_concurrency_ = runtime_profile()->AddSamplingCounter(
+      AVERAGE_HDFS_READ_THREAD_CONCURRENCY, &active_hdfs_read_thread_counter_);
+
+  bytes_read_local_ = ADD_COUNTER(runtime_profile(), "BytesReadLocal",
+      TUnit::BYTES);
+  bytes_read_short_circuit_ = ADD_COUNTER(runtime_profile(), "BytesReadShortCircuit",
+      TUnit::BYTES);
+  bytes_read_dn_cache_ = ADD_COUNTER(runtime_profile(), "BytesReadDataNodeCache",
+      TUnit::BYTES);
+  num_remote_ranges_ = ADD_COUNTER(runtime_profile(), "RemoteScanRanges",
+      TUnit::UNIT);
+  unexpected_remote_bytes_ = ADD_COUNTER(runtime_profile(), "BytesReadRemoteUnexpected",
+      TUnit::BYTES);
+
+  max_compressed_text_file_length_ = runtime_profile()->AddHighWaterMarkCounter(
+      "MaxCompressedTextFileLength", TUnit::BYTES);
+
+  for (int i = 0; i < state->io_mgr()->num_total_disks() + 1; ++i) {
+    hdfs_read_thread_concurrency_bucket_.push_back(
+        pool_->Add(new RuntimeProfile::Counter(TUnit::DOUBLE_VALUE, 0)));
+  }
+  runtime_profile()->RegisterBucketingCounters(&active_hdfs_read_thread_counter_,
+      &hdfs_read_thread_concurrency_bucket_);
+
+  counters_running_ = true;
+
+  int64_t total_splits = 0;
+  for (const auto& fd: file_descs_) total_splits += fd.second->splits.size();
+  progress_.Init(Substitute("Splits complete (node=$0)", total_splits), total_splits);
+  return Status::OK();
+}
+
+Status HdfsScanNodeBase::Reset(RuntimeState* state) {
+  DCHECK(false) << "Internal error: Scan nodes should not appear in subplans.";
+  return Status("Internal error: Scan nodes should not appear in subplans.");
+}
+
+void HdfsScanNodeBase::Close(RuntimeState* state) {
+  if (is_closed()) return;
+
+  if (reader_context_ != NULL) {
+    // There may still be io buffers used by parent nodes so we can't unregister the
+    // reader context yet. The runtime state keeps a list of all the reader contexts and
+    // they are unregistered when the fragment is closed.
+    state->reader_contexts()->push_back(reader_context_);
+    // Need to wait for all the active scanner threads to finish to ensure there is no
+    // more memory tracked by this scan node's mem tracker.
+    state->io_mgr()->CancelContext(reader_context_, true);
+  }
+
+  StopAndFinalizeCounters();
+
+  // There should be no active scanner threads and hdfs read threads.
+  DCHECK_EQ(active_scanner_thread_counter_.value(), 0);
+  DCHECK_EQ(active_hdfs_read_thread_counter_.value(), 0);
+
+  if (scan_node_pool_.get() != NULL) scan_node_pool_->FreeAll();
+
+  // Close all the partitions scanned by the scan node
+  for (int64_t partition_id: partition_ids_) {
+    HdfsPartitionDescriptor* partition_desc = hdfs_table_->GetPartition(partition_id);
+    if (partition_desc == NULL) {
+      // TODO: Revert when IMPALA-1702 is fixed.
+      LOG(ERROR) << "Bad table descriptor! table_id=" << hdfs_table_->id()
+                 << " partition_id=" << partition_id
+                 << "\n" << PrintThrift(state->fragment_params());
+      continue;
+    }
+    partition_desc->CloseExprs(state);
+  }
+
+  // Open collection conjuncts
+  for (const auto& tid_conjunct: conjuncts_map_) {
+    // conjuncts_ are already closed in ExecNode::Close()
+    if (tid_conjunct.first == tuple_id_) continue;
+    Expr::Close(tid_conjunct.second, state);
+  }
+
+  for (auto& filter_ctx: filter_ctxs_) filter_ctx.expr->Close(state);
+  ScanNode::Close(state);
+}
+
+Status HdfsScanNodeBase::IssueInitialScanRanges(RuntimeState* state) {
+  DCHECK(!initial_ranges_issued_);
+  initial_ranges_issued_ = true;
+
+  // No need to issue ranges with limit 0.
+  if (ReachedLimit()) {
+    DCHECK_EQ(limit_, 0);
+    return Status::OK();
+  }
+
+  int32 wait_time_ms = FLAGS_runtime_filter_wait_time_ms;
+  if (state->query_options().runtime_filter_wait_time_ms > 0) {
+    wait_time_ms = state->query_options().runtime_filter_wait_time_ms;
+  }
+  if (filter_ctxs_.size() > 0) WaitForRuntimeFilters(wait_time_ms);
+  // Apply dynamic partition-pruning per-file.
+  FileFormatsMap matching_per_type_files;
+  for (const FileFormatsMap::value_type& v: per_type_files_) {
+    vector<HdfsFileDesc*>* matching_files = &matching_per_type_files[v.first];
+    for (HdfsFileDesc* file: v.second) {
+      if (FilePassesFilterPredicates(filter_ctxs_, v.first, file)) {
+        matching_files->push_back(file);
+      }
+    }
+  }
+
+  // Issue initial ranges for all file types.
+  RETURN_IF_ERROR(HdfsParquetScanner::IssueInitialRanges(this,
+      matching_per_type_files[THdfsFileFormat::PARQUET]));
+  RETURN_IF_ERROR(HdfsTextScanner::IssueInitialRanges(this,
+      matching_per_type_files[THdfsFileFormat::TEXT]));
+  RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this,
+      matching_per_type_files[THdfsFileFormat::SEQUENCE_FILE]));
+  RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this,
+      matching_per_type_files[THdfsFileFormat::RC_FILE]));
+  RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this,
+      matching_per_type_files[THdfsFileFormat::AVRO]));
+
+  return Status::OK();
+}
+
+bool HdfsScanNodeBase::FilePassesFilterPredicates(const vector<FilterContext>& filter_ctxs,
+    const THdfsFileFormat::type& format, HdfsFileDesc* file) {
+#ifndef NDEBUG
+  if (FLAGS_skip_file_runtime_filtering) return true;
+#endif
+  if (filter_ctxs_.size() == 0) return true;
+  ScanRangeMetadata* metadata =
+      static_cast<ScanRangeMetadata*>(file->splits[0]->meta_data());
+  if (!PartitionPassesFilters(metadata->partition_id, FilterStats::FILES_KEY,
+          filter_ctxs)) {
+    for (int j = 0; j < file->splits.size(); ++j) {
+      // Mark range as complete to ensure progress.
+      RangeComplete(format, file->file_compression);
+    }
+    return false;
+  }
+  return true;
+}
+
+bool HdfsScanNodeBase::WaitForRuntimeFilters(int32_t time_ms) {
+  vector<string> arrived_filter_ids;
+  int32_t start = MonotonicMillis();
+  for (auto& ctx: filter_ctxs_) {
+    if (ctx.filter->WaitForArrival(time_ms)) {
+      arrived_filter_ids.push_back(Substitute("$0", ctx.filter->id()));
+    }
+  }
+  int32_t end = MonotonicMillis();
+  const string& wait_time = PrettyPrinter::Print(end - start, TUnit::TIME_MS);
+
+  if (arrived_filter_ids.size() == filter_ctxs_.size()) {
+    runtime_profile()->AddInfoString("Runtime filters",
+        Substitute("All filters arrived. Waited $0", wait_time));
+    VLOG_QUERY << "Filters arrived. Waited " << wait_time;
+    return true;
+  }
+
+  const string& filter_str = Substitute("Only following filters arrived: $0, waited $1",
+      join(arrived_filter_ids, ", "), wait_time);
+  runtime_profile()->AddInfoString("Runtime filters", filter_str);
+  VLOG_QUERY << filter_str;
+  return false;
+}
+
+DiskIoMgr::ScanRange* HdfsScanNodeBase::AllocateScanRange(
+    hdfsFS fs, const char* file, int64_t len, int64_t offset, int64_t partition_id,
+    int disk_id, bool try_cache, bool expected_local, int64_t mtime,
+    const DiskIoMgr::ScanRange* original_split) {
+  DCHECK_GE(disk_id, -1);
+  // Require that the scan range is within [0, file_length). While this cannot be used
+  // to guarantee safety (file_length metadata may be stale), it avoids different
+  // behavior between Hadoop FileSystems (e.g. s3n hdfsSeek() returns error when seeking
+  // beyond the end of the file).
+  DCHECK_GE(offset, 0);
+  DCHECK_GE(len, 0);
+  DCHECK_LE(offset + len, GetFileDesc(file)->file_length)
+      << "Scan range beyond end of file (offset=" << offset << ", len=" << len << ")";
+  disk_id = runtime_state_->io_mgr()->AssignQueue(file, disk_id, expected_local);
+
+  ScanRangeMetadata* metadata = runtime_state_->obj_pool()->Add(
+        new ScanRangeMetadata(partition_id, original_split));
+  DiskIoMgr::ScanRange* range =
+      runtime_state_->obj_pool()->Add(new DiskIoMgr::ScanRange());
+  range->Reset(fs, file, len, offset, disk_id, try_cache, expected_local,
+      mtime, metadata);
+  return range;
+}
+
+Status HdfsScanNodeBase::AddDiskIoRanges(const vector<DiskIoMgr::ScanRange*>& ranges,
+    int num_files_queued) {
+  RETURN_IF_ERROR(
+      runtime_state_->io_mgr()->AddScanRanges(reader_context_, ranges));
+  num_unqueued_files_.Add(-num_files_queued);
+  DCHECK_GE(num_unqueued_files_.Load(), 0);
+  return Status::OK();
+}
+
+HdfsFileDesc* HdfsScanNodeBase::GetFileDesc(const string& filename) {
+  DCHECK(file_descs_.find(filename) != file_descs_.end());
+  return file_descs_[filename];
+}
+
+void* HdfsScanNodeBase::GetCodegenFn(THdfsFileFormat::type type) {
+  CodegendFnMap::iterator it = codegend_fn_map_.find(type);
+  if (it == codegend_fn_map_.end()) return NULL;
+  return it->second;
+}
+
+Status HdfsScanNodeBase::CreateAndOpenScanner(HdfsPartitionDescriptor* partition,
+    ScannerContext* context, scoped_ptr<HdfsScanner>* scanner) {
+  DCHECK(context != NULL);
+  THdfsCompression::type compression =
+      context->GetStream()->file_desc()->file_compression;
+
+  // Create a new scanner for this file format and compression.
+  switch (partition->file_format()) {
+    case THdfsFileFormat::TEXT:
+      // Lzo-compressed text files are scanned by a scanner that it is implemented as a
+      // dynamic library, so that Impala does not include GPL code.
+      if (compression == THdfsCompression::LZO) {
+        scanner->reset(HdfsLzoTextScanner::GetHdfsLzoTextScanner(this, runtime_state_));
+      } else {
+        scanner->reset(new HdfsTextScanner(this, runtime_state_));
+      }
+      break;
+    case THdfsFileFormat::SEQUENCE_FILE:
+      scanner->reset(new HdfsSequenceScanner(this, runtime_state_));
+      break;
+    case THdfsFileFormat::RC_FILE:
+      scanner->reset(new HdfsRCFileScanner(this, runtime_state_));
+      break;
+    case THdfsFileFormat::AVRO:
+      scanner->reset(new HdfsAvroScanner(this, runtime_state_));
+      break;
+    case THdfsFileFormat::PARQUET:
+      scanner->reset(new HdfsParquetScanner(this, runtime_state_));
+      break;
+    default:
+      return Status(Substitute("Unknown Hdfs file format type: $0",
+          partition->file_format()));
+  }
+  DCHECK(scanner->get() != NULL);
+  Status status = ExecDebugAction(TExecNodePhase::PREPARE_SCANNER, runtime_state_);
+  if (status.ok()) {
+    status = scanner->get()->Open(context);
+    if (!status.ok()) scanner->get()->Close(scanner->get()->batch());
+  } else {
+    context->ClearStreams();
+  }
+  return status;
+}
+
+Tuple* HdfsScanNodeBase::InitTemplateTuple(const vector<ExprContext*>& value_ctxs,
+    MemPool* pool, RuntimeState* state) const {
+  if (partition_key_slots_.empty()) return NULL;
+  Tuple* template_tuple = Tuple::Create(tuple_desc_->byte_size(), pool);
+  for (int i = 0; i < partition_key_slots_.size(); ++i) {
+    const SlotDescriptor* slot_desc = partition_key_slots_[i];
+    ExprContext* value_ctx = value_ctxs[slot_desc->col_pos()];
+    /// This function may be called from multiple threads, and we expect each
+    /// thread to pass in their own cloned value contexts.
+    DCHECK(value_ctx->is_clone());
+    // Exprs guaranteed to be literals, so can safely be evaluated without a row.
+    RawValue::Write(value_ctx->GetValue(NULL), template_tuple, slot_desc, NULL);
+  }
+  return template_tuple;
+}
+
+void HdfsScanNodeBase::InitNullCollectionValues(const TupleDescriptor* tuple_desc,
+    Tuple* tuple) const {
+  for (const SlotDescriptor* slot_desc: tuple_desc->collection_slots()) {
+    CollectionValue* slot = reinterpret_cast<CollectionValue*>(
+        tuple->GetSlot(slot_desc->tuple_offset()));
+    if (tuple->IsNull(slot_desc->null_indicator_offset())) {
+      *slot = CollectionValue();
+      continue;
+    }
+    // Recursively traverse collection items.
+    const TupleDescriptor* item_desc = slot_desc->collection_item_descriptor();
+    if (item_desc->collection_slots().empty()) continue;
+    for (int i = 0; i < slot->num_tuples; ++i) {
+      int item_offset = i * item_desc->byte_size();
+      Tuple* collection_item = reinterpret_cast<Tuple*>(slot->ptr + item_offset);
+      InitNullCollectionValues(item_desc, collection_item);
+    }
+  }
+}
+
+void HdfsScanNodeBase::InitNullCollectionValues(RowBatch* row_batch) const {
+  DCHECK_EQ(row_batch->row_desc().tuple_descriptors().size(), 1);
+  const TupleDescriptor& tuple_desc =
+      *row_batch->row_desc().tuple_descriptors()[tuple_idx()];
+  if (tuple_desc.collection_slots().empty()) return;
+  for (int i = 0; i < row_batch->num_rows(); ++i) {
+    Tuple* tuple = row_batch->GetRow(i)->GetTuple(tuple_idx());
+    DCHECK(tuple != NULL);
+    InitNullCollectionValues(&tuple_desc, tuple);
+  }
+}
+
+bool HdfsScanNodeBase::PartitionPassesFilters(int32_t partition_id,
+    const string& stats_name, const vector<FilterContext>& filter_ctxs) {
+  if (filter_ctxs.size() == 0) return true;
+  DCHECK_EQ(filter_ctxs.size(), filter_ctxs_.size())
+      << "Mismatched number of filter contexts";
+  Tuple* template_tuple = partition_template_tuple_map_[partition_id];
+  // Defensive - if template_tuple is NULL, there can be no filters on partition columns.
+  if (template_tuple == NULL) return true;
+  TupleRow* tuple_row_mem = reinterpret_cast<TupleRow*>(&template_tuple);
+  for (const FilterContext& ctx: filter_ctxs) {
+    int target_ndx = ctx.filter->filter_desc().planid_to_target_ndx.at(id_);
+    if (!ctx.filter->filter_desc().targets[target_ndx].is_bound_by_partition_columns) {
+      continue;
+    }
+    void* e = ctx.expr->GetValue(tuple_row_mem);
+
+    // Not quite right because bitmap could arrive after Eval(), but we're ok with
+    // off-by-one errors.
+    bool processed = ctx.filter->HasBloomFilter();
+    bool passed_filter = ctx.filter->Eval<void>(e, ctx.expr->root()->type());
+    ctx.stats->IncrCounters(stats_name, 1, processed, !passed_filter);
+    if (!passed_filter) return false;
+  }
+
+  return true;
+}
+
+void HdfsScanNodeBase::RangeComplete(const THdfsFileFormat::type& file_type,
+    const THdfsCompression::type& compression_type) {
+  vector<THdfsCompression::type> types;
+  types.push_back(compression_type);
+  RangeComplete(file_type, types);
+}
+
+void HdfsScanNodeBase::RangeComplete(const THdfsFileFormat::type& file_type,
+    const vector<THdfsCompression::type>& compression_types) {
+  scan_ranges_complete_counter()->Add(1);
+  progress_.Update(1);
+  for (int i = 0; i < compression_types.size(); ++i) {
+    ++file_type_counts_[make_pair(file_type, compression_types[i])];
+  }
+}
+
+void HdfsScanNodeBase::ComputeSlotMaterializationOrder(vector<int>* order) const {
+  const vector<ExprContext*>& conjuncts = ExecNode::conjunct_ctxs();
+  // Initialize all order to be conjuncts.size() (after the last conjunct)
+  order->insert(order->begin(), materialized_slots().size(), conjuncts.size());
+
+  const DescriptorTbl& desc_tbl = runtime_state_->desc_tbl();
+
+  vector<SlotId> slot_ids;
+  for (int conjunct_idx = 0; conjunct_idx < conjuncts.size(); ++conjunct_idx) {
+    slot_ids.clear();
+    int num_slots = conjuncts[conjunct_idx]->root()->GetSlotIds(&slot_ids);
+    for (int j = 0; j < num_slots; ++j) {
+      SlotDescriptor* slot_desc = desc_tbl.GetSlotDescriptor(slot_ids[j]);
+      int slot_idx = GetMaterializedSlotIdx(slot_desc->col_path());
+      // slot_idx == -1 means this was a partition key slot which is always
+      // materialized before any slots.
+      if (slot_idx == -1) continue;
+      // If this slot hasn't been assigned an order, assign it be materialized
+      // before evaluating conjuncts[i]
+      if ((*order)[slot_idx] == conjuncts.size()) {
+        (*order)[slot_idx] = conjunct_idx;
+      }
+    }
+  }
+}
+
+void HdfsScanNodeBase::UpdateHdfsSplitStats(
+    const vector<TScanRangeParams>& scan_range_params_list,
+    PerVolumnStats* per_volume_stats) {
+  pair<int, int64_t> init_value(0, 0);
+  for (const TScanRangeParams& scan_range_params: scan_range_params_list) {
+    const TScanRange& scan_range = scan_range_params.scan_range;
+    if (!scan_range.__isset.hdfs_file_split) continue;
+    const THdfsFileSplit& split = scan_range.hdfs_file_split;
+    pair<int, int64_t>* stats =
+        FindOrInsert(per_volume_stats, scan_range_params.volume_id, init_value);
+    ++(stats->first);
+    stats->second += split.length;
+  }
+}
+
+void HdfsScanNodeBase::PrintHdfsSplitStats(const PerVolumnStats& per_volume_stats,
+    stringstream* ss) {
+  for (PerVolumnStats::const_iterator i = per_volume_stats.begin();
+       i != per_volume_stats.end(); ++i) {
+     (*ss) << i->first << ":" << i->second.first << "/"
+         << PrettyPrinter::Print(i->second.second, TUnit::BYTES) << " ";
+  }
+}
+
+void HdfsScanNodeBase::StopAndFinalizeCounters() {
+  if (!counters_running_) return;
+  counters_running_ = false;
+
+  PeriodicCounterUpdater::StopTimeSeriesCounter(bytes_read_timeseries_counter_);
+  PeriodicCounterUpdater::StopRateCounter(total_throughput_counter());
+  PeriodicCounterUpdater::StopSamplingCounter(average_scanner_thread_concurrency_);
+  PeriodicCounterUpdater::StopSamplingCounter(average_hdfs_read_thread_concurrency_);
+  PeriodicCounterUpdater::StopBucketingCounters(&hdfs_read_thread_concurrency_bucket_,
+      true);
+
+  // Output hdfs read thread concurrency into info string
+  stringstream ss;
+  for (int i = 0; i < hdfs_read_thread_concurrency_bucket_.size(); ++i) {
+    ss << i << ":" << setprecision(4)
+       << hdfs_read_thread_concurrency_bucket_[i]->double_value() << "% ";
+  }
+  runtime_profile_->AddInfoString("Hdfs Read Thread Concurrency Bucket", ss.str());
+
+  // Convert disk access bitmap to num of disk accessed
+  uint64_t num_disk_bitmap = disks_accessed_bitmap_.value();
+  int64_t num_disk_accessed = BitUtil::Popcount(num_disk_bitmap);
+  if (num_disks_accessed_counter_ != NULL) {
+    num_disks_accessed_counter_->Set(num_disk_accessed);
+  }
+
+  // output completed file types and counts to info string
+  if (!file_type_counts_.empty()) {
+    stringstream ss;
+    {
+      for (FileTypeCountsMap::const_iterator it = file_type_counts_.begin();
+          it != file_type_counts_.end(); ++it) {
+        ss << it->first.first << "/" << it->first.second << ":" << it->second << " ";
+      }
+    }
+    runtime_profile_->AddInfoString("File Formats", ss.str());
+  }
+
+  // Output fraction of scanners with codegen enabled
+  int num_enabled = num_scanners_codegen_enabled_.Load();
+  int total = num_enabled + num_scanners_codegen_disabled_.Load();
+  runtime_profile()->AppendExecOption(
+      Substitute("Codegen enabled: $0 out of $1", num_enabled, total));
+
+  if (reader_context_ != NULL) {
+    bytes_read_local_->Set(runtime_state_->io_mgr()->bytes_read_local(reader_context_));
+    bytes_read_short_circuit_->Set(
+        runtime_state_->io_mgr()->bytes_read_short_circuit(reader_context_));
+    bytes_read_dn_cache_->Set(
+        runtime_state_->io_mgr()->bytes_read_dn_cache(reader_context_));
+    num_remote_ranges_->Set(static_cast<int64_t>(
+        runtime_state_->io_mgr()->num_remote_ranges(reader_context_)));
+    unexpected_remote_bytes_->Set(
+        runtime_state_->io_mgr()->unexpected_remote_bytes(reader_context_));
+
+    if (unexpected_remote_bytes_->value() >= UNEXPECTED_REMOTE_BYTES_WARN_THRESHOLD) {
+      runtime_state_->LogError(ErrorMsg(TErrorCode::GENERAL, Substitute(
+          "Read $0 of data across network that was expected to be local. "
+          "Block locality metadata for table '$1.$2' may be stale. Consider running "
+          "\"INVALIDATE METADATA `$1`.`$2`\".",
+          PrettyPrinter::Print(unexpected_remote_bytes_->value(), TUnit::BYTES),
+          hdfs_table_->database(), hdfs_table_->name())));
+    }
+
+    ImpaladMetrics::IO_MGR_BYTES_READ->Increment(bytes_read_counter()->value());
+    ImpaladMetrics::IO_MGR_LOCAL_BYTES_READ->Increment(
+        bytes_read_local_->value());
+    ImpaladMetrics::IO_MGR_SHORT_CIRCUIT_BYTES_READ->Increment(
+        bytes_read_short_circuit_->value());
+    ImpaladMetrics::IO_MGR_CACHED_BYTES_READ->Increment(
+        bytes_read_dn_cache_->value());
+  }
+}
+
+Status HdfsScanNodeBase::TriggerDebugAction() {
+  return ExecDebugAction(TExecNodePhase::GETNEXT, runtime_state_);
+}