You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ab...@apache.org on 2016/09/08 04:39:36 UTC

[6/7] incubator-impala git commit: IMPALA-3905: Add single-threaded scan node.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/hdfs-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
new file mode 100644
index 0000000..7ea4b9d
--- /dev/null
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -0,0 +1,459 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#ifndef IMPALA_EXEC_HDFS_SCAN_NODE_BASE_H_
+#define IMPALA_EXEC_HDFS_SCAN_NODE_BASE_H_
+
+#include <stdint.h>
+#include <memory>
+#include <unordered_set>
+#include <vector>
+
+#include <boost/unordered_map.hpp>
+#include <boost/scoped_ptr.hpp>
+
+#include "exec/filter-context.h"
+#include "exec/scan-node.h"
+#include "runtime/descriptors.h"
+#include "runtime/disk-io-mgr.h"
+#include "util/avro-util.h"
+#include "util/progress-updater.h"
+#include "util/spinlock.h"
+
+namespace impala {
+
+class ScannerContext;
+class DescriptorTbl;
+class HdfsScanner;
+class RowBatch;
+class Status;
+class Tuple;
+class TPlanNode;
+class TScanRange;
+
+/// Maintains per file information for files assigned to this scan node.  This includes
+/// all the splits for the file. Note that it is not thread-safe.
+struct HdfsFileDesc {
+  HdfsFileDesc(const std::string& filename)
+    : fs(NULL), filename(filename), file_length(0), mtime(0),
+      file_compression(THdfsCompression::NONE) {
+  }
+
+  /// Connection to the filesystem containing the file.
+  hdfsFS fs;
+
+  /// File name including the path.
+  std::string filename;
+
+  /// Length of the file. This is not related to which parts of the file have been
+  /// assigned to this node.
+  int64_t file_length;
+
+  /// Last modified time
+  int64_t mtime;
+
+  THdfsCompression::type file_compression;
+
+  /// Splits (i.e. raw byte ranges) for this file, assigned to this scan node.
+  std::vector<DiskIoMgr::ScanRange*> splits;
+};
+
+/// Struct for additional metadata for scan ranges. This contains the partition id
+/// that this scan range is for.
+struct ScanRangeMetadata {
+  /// The partition id that this range is part of.
+  int64_t partition_id;
+
+  /// For parquet scan ranges we initially create a request for the file footer for each
+  /// split; we store a pointer to the actual split so that we can recover its information
+  /// for the scanner to process.
+  const DiskIoMgr::ScanRange* original_split;
+
+  ScanRangeMetadata(int64_t partition_id, const DiskIoMgr::ScanRange* original_split)
+      : partition_id(partition_id), original_split(original_split) { }
+};
+
+
+/// Base class for all Hdfs scan nodes. Contains common members and functions
+/// that are independent of whether batches are materialized by the main thread
+/// (via HdfsScanner::GexNext()) or by spinning up separate threads that feed
+/// into a RowBatch queue (via HdfsScanner::ProcessSplit()). Those specifics
+/// are expected to be implemented in subclasses.
+///
+/// Subclasses may expect to receive runtime filters produced elsewhere in the plan
+/// (even from remote fragments). These filters arrive asynchronously during execution,
+/// and are applied as soon as they arrive. Filters may be applied by the scan node in
+/// the following scopes:
+///
+/// 1. Per-file (all file formats, partition column filters only) - filtering at this
+/// scope saves IO as the filters are applied before scan ranges are issued.
+/// 2. Per-scan-range (all file formats, partition column filters only) - filtering at
+/// this scope saves CPU as filtered scan ranges are never scanned.
+///
+/// Scanners may also use the same filters to eliminate rows at finer granularities
+/// (e.g. per row).
+///
+/// TODO: Revisit and minimize metrics. Move those specific to legacy multi-threaded
+/// scans into HdfsScanNode.
+/// TODO: Once the legacy scan node has been removed, several functions can be made
+/// non-virtual. Also merge this class with HdfsScanNodeMt.
+class HdfsScanNodeBase : public ScanNode {
+ public:
+  HdfsScanNodeBase(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
+  ~HdfsScanNodeBase();
+
+  virtual Status Init(const TPlanNode& tnode, RuntimeState* state);
+  virtual Status Prepare(RuntimeState* state);
+  virtual Status Open(RuntimeState* state);
+  virtual Status Reset(RuntimeState* state);
+  virtual void Close(RuntimeState* state);
+
+  /// Returns true if this node uses separate threads for scanners that append RowBatches
+  /// to a queue, false otherwise.
+  virtual bool HasRowBatchQueue() const = 0;
+
+  int limit() const { return limit_; }
+
+  const std::vector<SlotDescriptor*>& materialized_slots()
+      const { return materialized_slots_; }
+
+  /// Returns the tuple idx into the row for this scan node to output to.
+  /// Currently this is always 0.
+  int tuple_idx() const { return 0; }
+
+  /// Returns number of partition keys in the table.
+  int num_partition_keys() const { return hdfs_table_->num_clustering_cols(); }
+
+  /// Returns number of partition key slots.
+  int num_materialized_partition_keys() const { return partition_key_slots_.size(); }
+
+  const TupleDescriptor* tuple_desc() const { return tuple_desc_; }
+  const HdfsTableDescriptor* hdfs_table() { return hdfs_table_; }
+  const AvroSchemaElement& avro_schema() { return *avro_schema_.get(); }
+  RuntimeState* runtime_state() { return runtime_state_; }
+  int skip_header_line_count() const { return skip_header_line_count_; }
+  DiskIoRequestContext* reader_context() { return reader_context_; }
+
+  typedef std::map<TupleId, std::vector<ExprContext*>> ConjunctsMap;
+  const ConjunctsMap& conjuncts_map() const { return conjuncts_map_; }
+
+  RuntimeProfile::HighWaterMarkCounter* max_compressed_text_file_length() {
+    return max_compressed_text_file_length_;
+  }
+
+  const static int SKIP_COLUMN = -1;
+
+  /// Returns index into materialized_slots with 'path'.  Returns SKIP_COLUMN if
+  /// that path is not materialized.
+  int GetMaterializedSlotIdx(const std::vector<int>& path) const {
+    PathToSlotIdxMap::const_iterator result = path_to_materialized_slot_idx_.find(path);
+    if (result == path_to_materialized_slot_idx_.end()) return SKIP_COLUMN;
+    return result->second;
+  }
+
+  /// The result array is of length hdfs_table_->num_cols(). The i-th element is true iff
+  /// column i should be materialized.
+  const bool* is_materialized_col() {
+    return reinterpret_cast<const bool*>(&is_materialized_col_[0]);
+  }
+
+  /// Returns the per format codegen'd function.  Scanners call this to get the
+  /// codegen'd function to use.  Returns NULL if codegen should not be used.
+  void* GetCodegenFn(THdfsFileFormat::type);
+
+  inline void IncNumScannersCodegenEnabled() { num_scanners_codegen_enabled_.Add(1); }
+  inline void IncNumScannersCodegenDisabled() { num_scanners_codegen_disabled_.Add(1); }
+
+  /// Allocate a new scan range object, stored in the runtime state's object pool. For
+  /// scan ranges that correspond to the original hdfs splits, the partition id must be
+  /// set to the range's partition id. For other ranges (e.g. columns in parquet, read
+  /// past buffers), the partition_id is unused. expected_local should be true if this
+  /// scan range is not expected to require a remote read. The range must fall within
+  /// the file bounds. That is, the offset must be >= 0, and offset + len <= file_length.
+  /// If not NULL, the 'original_split' pointer is stored for reference in the scan range
+  /// metadata of the scan range that is to be allocated.
+  /// This is thread safe.
+  DiskIoMgr::ScanRange* AllocateScanRange(
+      hdfsFS fs, const char* file, int64_t len, int64_t offset, int64_t partition_id,
+      int disk_id, bool try_cache, bool expected_local, int64_t mtime,
+      const DiskIoMgr::ScanRange* original_split = NULL);
+
+  /// Adds ranges to the io mgr queue. 'num_files_queued' indicates how many file's scan
+  /// ranges have been added completely.  A file's scan ranges are added completely if no
+  /// new scanner threads will be needed to process that file besides the additional
+  /// threads needed to process those in 'ranges'.
+  /// Can be overridden to add scan-node specific actions like starting scanner threads.
+  virtual Status AddDiskIoRanges(const std::vector<DiskIoMgr::ScanRange*>& ranges,
+      int num_files_queued);
+
+  /// Adds all splits for file_desc to the io mgr queue and indicates one file has
+  /// been added completely.
+  inline Status AddDiskIoRanges(const HdfsFileDesc* file_desc) {
+    return AddDiskIoRanges(file_desc->splits, 1);
+  }
+
+  /// Allocates and initializes a new template tuple allocated from pool with values
+  /// from the partition columns for the current scan range, if any,
+  /// Returns NULL if there are no partition keys slots.
+  Tuple* InitTemplateTuple(const std::vector<ExprContext*>& value_ctxs,
+      MemPool* pool, RuntimeState* state) const;
+
+  /// Returns the file desc for 'filename'.  Returns NULL if filename is invalid.
+  HdfsFileDesc* GetFileDesc(const std::string& filename);
+
+  /// Called by scanners when a range is complete. Used to record progress.
+  /// This *must* only be called after a scanner has completely finished its
+  /// scan range (i.e. context->Flush()), and has returned the final row batch.
+  /// Otherwise, scan nodes using a RowBatch queue may lose the last batch due
+  /// to racing with shutting down the queue.
+  void RangeComplete(const THdfsFileFormat::type& file_type,
+      const THdfsCompression::type& compression_type);
+  /// Same as above except for when multiple compression codecs were used
+  /// in the file. The metrics are incremented for each compression_type.
+  virtual void RangeComplete(const THdfsFileFormat::type& file_type,
+      const std::vector<THdfsCompression::type>& compression_type);
+
+  /// Utility function to compute the order in which to materialize slots to allow for
+  /// computing conjuncts as slots get materialized (on partial tuples).
+  /// 'order' will contain for each slot, the first conjunct it is associated with.
+  /// e.g. order[2] = 1 indicates materialized_slots[2] must be materialized before
+  /// evaluating conjuncts[1].  Slots that are not referenced by any conjuncts will have
+  /// order set to conjuncts.size()
+  void ComputeSlotMaterializationOrder(std::vector<int>* order) const;
+
+  /// Returns true if there are no materialized slots, such as a count(*) over the table.
+  inline bool IsZeroSlotTableScan() const {
+    return materialized_slots().empty() && tuple_desc()->tuple_path().empty();
+  }
+
+  /// map from volume id to <number of split, per volume split lengths>
+  typedef boost::unordered_map<int32_t, std::pair<int, int64_t>> PerVolumnStats;
+
+  /// Update the per volume stats with the given scan range params list
+  static void UpdateHdfsSplitStats(
+      const std::vector<TScanRangeParams>& scan_range_params_list,
+      PerVolumnStats* per_volume_stats);
+
+  /// Output the per_volume_stats to stringstream. The output format is a list of:
+  /// <volume id>:<# splits>/<per volume split lengths>
+  static void PrintHdfsSplitStats(const PerVolumnStats& per_volume_stats,
+      std::stringstream* ss);
+
+  /// Description string for the per volume stats output.
+  static const std::string HDFS_SPLIT_STATS_DESC;
+
+  /// Returns true if partition 'partition_id' passes all the filter predicates in
+  /// 'filter_ctxs' and should not be filtered out. 'stats_name' is the key of one of the
+  /// counter groups in FilterStats, and is used to update the correct statistics.
+  ///
+  /// 'filter_ctxs' is either an empty list, in which case filtering is disabled and the
+  /// function returns true, or a set of filter contexts to evaluate.
+  bool PartitionPassesFilters(int32_t partition_id, const std::string& stats_name,
+      const std::vector<FilterContext>& filter_ctxs);
+
+  const std::vector<FilterContext>& filter_ctxs() const { return filter_ctxs_; }
+
+ protected:
+  friend class ScannerContext;
+  friend class HdfsScanner;
+
+  RuntimeState* runtime_state_;
+
+  // Number of header lines to skip at the beginning of each file of this table. Only set
+  // to values > 0 for hdfs text files.
+  const int skip_header_line_count_;
+
+  /// Tuple id resolved in Prepare() to set tuple_desc_
+  const int tuple_id_;
+
+  /// RequestContext object to use with the disk-io-mgr for reads.
+  DiskIoRequestContext* reader_context_;
+
+  /// Descriptor for tuples this scan node constructs
+  const TupleDescriptor* tuple_desc_;
+
+  /// Map from partition ID to a template tuple (owned by scan_node_pool_) which has only
+  /// the partition columns for that partition materialized. Used to filter files and scan
+  /// ranges on partition-column filters. Populated in Prepare().
+  boost::unordered_map<int64_t, Tuple*> partition_template_tuple_map_;
+
+  /// Descriptor for the hdfs table, including partition and format metadata.
+  /// Set in Prepare, owned by RuntimeState
+  const HdfsTableDescriptor* hdfs_table_;
+
+  /// The root of the table's Avro schema, if we're scanning an Avro table.
+  ScopedAvroSchemaElement avro_schema_;
+
+  /// If true, the warning that some disk ids are unknown was logged.  Only log this once
+  /// per scan node since it can be noisy.
+  bool unknown_disk_id_warned_;
+
+  /// Partitions scanned by this scan node.
+  std::unordered_set<int64_t> partition_ids_;
+
+  /// File path => file descriptor (which includes the file's splits)
+  typedef std::map<std::string, HdfsFileDesc*> FileDescMap;
+  FileDescMap file_descs_;
+
+  /// File format => file descriptors.
+  typedef std::map<THdfsFileFormat::type, std::vector<HdfsFileDesc*>> FileFormatsMap;
+  FileFormatsMap per_type_files_;
+
+  /// Conjuncts for each materialized tuple (top-level row batch tuples and collection
+  /// item tuples). Includes a copy of ExecNode.conjuncts_.
+  ConjunctsMap conjuncts_map_;
+
+  /// Set to true when the initial scan ranges are issued to the IoMgr. This happens on
+  /// the first call to GetNext(). The token manager, in a different thread, will read
+  /// this variable.
+  bool initial_ranges_issued_;
+
+  /// Number of files that have not been issued from the scanners.
+  AtomicInt32 num_unqueued_files_;
+
+  /// Per scanner type codegen'd fn.
+  typedef std::map<THdfsFileFormat::type, void*> CodegendFnMap;
+  CodegendFnMap codegend_fn_map_;
+
+  /// Maps from a slot's path to its index into materialized_slots_.
+  typedef boost::unordered_map<std::vector<int>, int> PathToSlotIdxMap;
+  PathToSlotIdxMap path_to_materialized_slot_idx_;
+
+  /// List of contexts for expected runtime filters for this scan node. These contexts are
+  /// cloned by individual scanners to be used in multi-threaded contexts, passed through
+  /// the per-scanner ScannerContext..
+  std::vector<FilterContext> filter_ctxs_;
+
+  /// is_materialized_col_[i] = <true i-th column should be materialized, false otherwise>
+  /// for 0 <= i < total # columns in table
+  //
+  /// This should be a vector<bool>, but bool vectors are special-cased and not stored
+  /// internally as arrays, so instead we store as chars and cast to bools as needed
+  std::vector<char> is_materialized_col_;
+
+  /// Vector containing slot descriptors for all non-partition key slots.  These
+  /// descriptors are sorted in order of increasing col_pos.
+  std::vector<SlotDescriptor*> materialized_slots_;
+
+  /// Vector containing slot descriptors for all partition key slots.
+  std::vector<SlotDescriptor*> partition_key_slots_;
+
+  /// Keeps track of total splits and the number finished.
+  ProgressUpdater progress_;
+
+  /// Counters which track the number of scanners that have codegen enabled for the
+  /// materialize and conjuncts evaluation code paths.
+  AtomicInt32 num_scanners_codegen_enabled_;
+  AtomicInt32 num_scanners_codegen_disabled_;
+
+  /// This is the number of io buffers that are owned by the scan node and the scanners.
+  /// This is used just to help debug leaked io buffers to determine if the leak is
+  /// happening in the scanners vs other parts of the execution.
+  /// TODO: Remove this counter when deprecating the multi-threaded scan node.
+  AtomicInt32 num_owned_io_buffers_;
+
+  /// If true, counters are actively running and need to be reported in the runtime
+  /// profile.
+  bool counters_running_;
+
+  /// The size of the largest compressed text file to be scanned. This is used to
+  /// estimate scanner thread memory usage.
+  RuntimeProfile::HighWaterMarkCounter* max_compressed_text_file_length_;
+
+  /// Disk accessed bitmap
+  RuntimeProfile::Counter disks_accessed_bitmap_;
+
+  /// Total number of bytes read locally
+  RuntimeProfile::Counter* bytes_read_local_;
+
+  /// Total number of bytes read via short circuit read
+  RuntimeProfile::Counter* bytes_read_short_circuit_;
+
+  /// Total number of bytes read from data node cache
+  RuntimeProfile::Counter* bytes_read_dn_cache_;
+
+  /// Total number of remote scan ranges
+  RuntimeProfile::Counter* num_remote_ranges_;
+
+  /// Total number of bytes read remotely that were expected to be local
+  RuntimeProfile::Counter* unexpected_remote_bytes_;
+
+  /// Pool for allocating some amounts of memory that is shared between scanners.
+  /// e.g. partition key tuple and their string buffers
+  boost::scoped_ptr<MemPool> scan_node_pool_;
+
+  /// Status of failed operations.  This is set in the ScannerThreads
+  /// Returned in GetNext() if an error occurred.  An non-ok status triggers cleanup
+  /// scanner threads.
+  Status status_;
+
+  /// Mapping of file formats (file type, compression type) to the number of
+  /// splits of that type and the lock protecting it.
+  typedef std::map<
+      std::pair<THdfsFileFormat::type, THdfsCompression::type>, int> FileTypeCountsMap;
+  FileTypeCountsMap file_type_counts_;
+
+  /// Performs dynamic partition pruning, i.e., applies runtime filters to files, and
+  /// issues initial ranges for all file types. Waits for runtime filters if necessary.
+  /// Only valid to call if !initial_ranges_issued_. Sets initial_ranges_issued_ to true.
+  Status IssueInitialScanRanges(RuntimeState* state);
+
+  /// Create and open new scanner for this partition type.
+  /// If the scanner is successfully created, it is returned in 'scanner'.
+  /// Passes 'add_batches_to_queue' to the scanner constructor.
+  Status CreateAndOpenScanner(HdfsPartitionDescriptor* partition,
+      ScannerContext* context, boost::scoped_ptr<HdfsScanner>* scanner);
+
+  /// Recursively initializes all NULL collection slots to an empty CollectionValue in
+  /// addition to maintaining the null bit. Hack to allow UnnestNode to project out
+  /// collection slots. Assumes that the null bit has already been un/set.
+  /// TODO: Remove this function once the TODOs in UnnestNode regarding projection
+  /// have been addressed.
+  void InitNullCollectionValues(const TupleDescriptor* tuple_desc, Tuple* tuple) const;
+
+  /// Helper to call InitNullCollectionValues() on all tuples produced by this scan
+  /// in 'row_batch'.
+  void InitNullCollectionValues(RowBatch* row_batch) const;
+
+  /// Returns false if, according to filters in 'filter_ctxs', 'file' should be filtered
+  /// and therefore not processed. 'file_type' is the the format of 'file', and is used
+  /// for bookkeeping. Returns true if all filters pass or are not present.
+  bool FilePassesFilterPredicates(const std::vector<FilterContext>& filter_ctxs,
+      const THdfsFileFormat::type& file_type, HdfsFileDesc* file);
+
+  /// Waits for up to time_ms for runtime filters to arrive, checking every 20ms. Returns
+  /// true if all filters arrived within the time limit (as measured from the time of
+  /// RuntimeFilterBank::RegisterFilter()), false otherwise.
+  bool WaitForRuntimeFilters(int32_t time_ms);
+
+  /// Stops periodic counters and aggregates counter values for the entire scan node.
+  /// This should be called as soon as the scan node is complete to get the most accurate
+  /// counter values.
+  /// This can be called multiple times, subsequent calls will be ignored.
+  /// This must be called on Close() to unregister counters.
+  /// Scan nodes with a RowBatch queue may have to synchronize calls to this function.
+  void StopAndFinalizeCounters();
+
+  /// Calls ExecDebugAction(). Returns the status based on the debug action specified
+  /// for the query.
+  Status TriggerDebugAction();
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/hdfs-scan-node-mt.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-mt.cc b/be/src/exec/hdfs-scan-node-mt.cc
new file mode 100644
index 0000000..8c61326
--- /dev/null
+++ b/be/src/exec/hdfs-scan-node-mt.cc
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/hdfs-scan-node-mt.h"
+
+#include <sstream>
+
+#include "runtime/runtime-state.h"
+#include "runtime/row-batch.h"
+#include "util/debug-util.h"
+#include "util/runtime-profile-counters.h"
+
+#include "gen-cpp/PlanNodes_types.h"
+
+using std::stringstream;
+
+namespace impala {
+
+HdfsScanNodeMt::HdfsScanNodeMt(ObjectPool* pool, const TPlanNode& tnode,
+                           const DescriptorTbl& descs)
+    : HdfsScanNodeBase(pool, tnode, descs),
+      scan_range_(NULL),
+      scanner_(NULL) {
+}
+
+HdfsScanNodeMt::~HdfsScanNodeMt() {
+}
+
+Status HdfsScanNodeMt::Prepare(RuntimeState* state) {
+  RETURN_IF_ERROR(HdfsScanNodeBase::Prepare(state));
+  // Return an error if this scan node has been assigned a range that is not supported
+  // because the scanner of the corresponding file format does implement GetNext().
+  for (const auto& files: per_type_files_) {
+    if (!files.second.empty() && files.first != THdfsFileFormat::PARQUET) {
+      stringstream msg;
+      msg << "Unsupported file format with HdfsScanNodeMt: " << files.first;
+      return Status(msg.str());
+    }
+  }
+  return Status::OK();
+}
+
+Status HdfsScanNodeMt::Open(RuntimeState* state) {
+  SCOPED_TIMER(runtime_profile_->total_time_counter());
+  RETURN_IF_ERROR(HdfsScanNodeBase::Open(state));
+  DCHECK(!initial_ranges_issued_);
+  RETURN_IF_ERROR(IssueInitialScanRanges(state));
+  return Status::OK();
+}
+
+Status HdfsScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
+  SCOPED_TIMER(runtime_profile_->total_time_counter());
+  RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
+  RETURN_IF_CANCELLED(state);
+  RETURN_IF_ERROR(QueryMaintenance(state));
+
+  DCHECK(scan_range_ == NULL || scanner_ != NULL);
+  if (scan_range_ == NULL || scanner_->eos()) {
+    if (scanner_ != NULL && scanner_->eos()) {
+      scanner_->Close(row_batch);
+      scanner_.reset();
+    }
+    RETURN_IF_ERROR(
+        runtime_state_->io_mgr()->GetNextRange(reader_context_, &scan_range_));
+    if (scan_range_ == NULL) {
+      *eos = true;
+      StopAndFinalizeCounters();
+      return Status::OK();
+    }
+    ScanRangeMetadata* metadata =
+        static_cast<ScanRangeMetadata*>(scan_range_->meta_data());
+    int64_t partition_id = metadata->partition_id;
+    HdfsPartitionDescriptor* partition = hdfs_table_->GetPartition(partition_id);
+    scanner_ctx_.reset(new ScannerContext(
+        runtime_state_, this, partition, scan_range_, filter_ctxs()));
+    RETURN_IF_ERROR(CreateAndOpenScanner(partition, scanner_ctx_.get(), &scanner_));
+  }
+
+  Status status = scanner_->GetNext(row_batch);
+  if (!status.ok()) {
+    scanner_->Close(row_batch);
+    scanner_.reset();
+    num_owned_io_buffers_.Add(-row_batch->num_io_buffers());
+    return status;
+  }
+
+  num_rows_returned_ += row_batch->num_rows();
+  if (ReachedLimit()) {
+    int num_rows_over = num_rows_returned_ - limit_;
+    row_batch->set_num_rows(row_batch->num_rows() - num_rows_over);
+    num_rows_returned_ -= num_rows_over;
+    scan_range_ = NULL;
+    scanner_->Close(row_batch);
+    scanner_.reset();
+    *eos = true;
+  }
+  COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+  num_owned_io_buffers_.Add(-row_batch->num_io_buffers());
+
+  if (*eos) StopAndFinalizeCounters();
+  return Status::OK();
+}
+
+void HdfsScanNodeMt::Close(RuntimeState* state) {
+  if (is_closed()) return;
+  scanner_.reset();
+  scanner_ctx_.reset();
+  HdfsScanNodeBase::Close(state);
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/hdfs-scan-node-mt.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-mt.h b/be/src/exec/hdfs-scan-node-mt.h
new file mode 100644
index 0000000..7829d47
--- /dev/null
+++ b/be/src/exec/hdfs-scan-node-mt.h
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#ifndef IMPALA_EXEC_HDFS_SCAN_NODE_MT_H_
+#define IMPALA_EXEC_HDFS_SCAN_NODE_MT_H_
+
+#include <boost/scoped_ptr.hpp>
+
+#include "exec/hdfs-scanner.h"
+#include "exec/hdfs-scan-node-base.h"
+#include "exec/scanner-context.h"
+
+namespace impala {
+
+class DescriptorTbl;
+class ObjectPool;
+class RuntimeState;
+class RowBatch;
+class TPlanNode;
+
+/// Scan node that materializes tuples, evaluates conjuncts and runtime filters
+/// in the thread calling GetNext(). Uses the HdfsScanner::GetNext() interface.
+class HdfsScanNodeMt : public HdfsScanNodeBase {
+ public:
+  HdfsScanNodeMt(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
+  ~HdfsScanNodeMt();
+
+  virtual Status Prepare(RuntimeState* state);
+  virtual Status Open(RuntimeState* state);
+  virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
+  virtual void Close(RuntimeState* state);
+
+  virtual bool HasRowBatchQueue() const { return false; }
+
+ private:
+  /// Current scan range and corresponding scanner.
+  DiskIoMgr::ScanRange* scan_range_;
+  boost::scoped_ptr<ScannerContext> scanner_ctx_;
+  boost::scoped_ptr<HdfsScanner> scanner_;
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index e0cee4d..83f6452 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -16,70 +16,31 @@
 // under the License.
 
 #include "exec/hdfs-scan-node.h"
-#include "exec/base-sequence-scanner.h"
-#include "exec/hdfs-text-scanner.h"
-#include "exec/hdfs-lzo-text-scanner.h"
-#include "exec/hdfs-sequence-scanner.h"
-#include "exec/hdfs-rcfile-scanner.h"
-#include "exec/hdfs-avro-scanner.h"
-#include "exec/hdfs-parquet-scanner.h"
 
 #include <sstream>
-#include <avro/errors.h>
-#include <avro/schema.h>
-#include <boost/algorithm/string.hpp>
-#include <boost/filesystem.hpp>
-#include <gutil/strings/substitute.h>
 
-#include <hdfs.h>
-
-#include "codegen/llvm-codegen.h"
 #include "common/logging.h"
-#include "common/object-pool.h"
-#include "exprs/expr-context.h"
+#include "exec/hdfs-scanner.h"
+#include "exec/scanner-context.h"
 #include "runtime/descriptors.h"
-#include "runtime/hdfs-fs-cache.h"
 #include "runtime/runtime-filter.inline.h"
 #include "runtime/runtime-state.h"
-#include "runtime/mem-pool.h"
 #include "runtime/mem-tracker.h"
-#include "runtime/raw-value.h"
 #include "runtime/row-batch.h"
-#include "runtime/string-buffer.h"
 #include "scheduling/query-resource-mgr.h"
-#include "util/bit-util.h"
-#include "util/container-util.h"
 #include "util/debug-util.h"
 #include "util/disk-info.h"
-#include "util/error-util.h"
-#include "util/hdfs-util.h"
-#include "util/impalad-metrics.h"
-#include "util/periodic-counter-updater.h"
 #include "util/runtime-profile-counters.h"
 
-#include "gen-cpp/PlanNodes_types.h"
-
 #include "common/names.h"
 
 DEFINE_int32(max_row_batches, 0, "the maximum size of materialized_row_batches_");
-DEFINE_bool(suppress_unknown_disk_id_warnings, false,
-    "Suppress unknown disk id warnings generated when the HDFS implementation does not"
-    " provide volume/disk information.");
-DEFINE_int32(runtime_filter_wait_time_ms, 1000, "(Advanced) the maximum time, in ms, "
-    "that a scan node will wait for expected runtime filters to arrive.");
 
 #ifndef NDEBUG
 DECLARE_bool(skip_file_runtime_filtering);
 #endif
 
-namespace filesystem = boost::filesystem;
 using namespace impala;
-using namespace llvm;
-using namespace strings;
-using boost::algorithm::join;
-
-const string HdfsScanNode::HDFS_SPLIT_STATS_DESC =
-    "Hdfs split stats (<volume id>:<# splits>/<split lengths>)";
 
 // Amount of memory that we approximate a scanner thread will use not including IoBuffers.
 // The memory used does not vary considerably between file formats (just a couple of MBs).
@@ -91,37 +52,17 @@ const int SCANNER_THREAD_MEM_USAGE = 32 * 1024 * 1024;
 // estimate scanner thread memory usage.
 const int COMPRESSED_TEXT_COMPRESSION_RATIO = 11;
 
-// Determines how many unexpected remote bytes trigger an error in the runtime state
-const int UNEXPECTED_REMOTE_BYTES_WARN_THRESHOLD = 64 * 1024 * 1024;
-
 // Amount of time to block waiting for GetNext() to release scanner threads between
 // checking if a scanner thread should yield itself back to the global thread pool.
 const int SCANNER_THREAD_WAIT_TIME_MS = 20;
 
 HdfsScanNode::HdfsScanNode(ObjectPool* pool, const TPlanNode& tnode,
                            const DescriptorTbl& descs)
-    : ScanNode(pool, tnode, descs),
-      runtime_state_(NULL),
-      skip_header_line_count_(tnode.hdfs_scan_node.__isset.skip_header_line_count ?
-          tnode.hdfs_scan_node.skip_header_line_count : 0),
-      tuple_id_(tnode.hdfs_scan_node.tuple_id),
-      reader_context_(NULL),
-      tuple_desc_(NULL),
-      hdfs_table_(NULL),
-      unknown_disk_id_warned_(false),
-      initial_ranges_issued_(false),
+    : HdfsScanNodeBase(pool, tnode, descs),
       ranges_issued_barrier_(1),
       scanner_thread_bytes_required_(0),
-      max_compressed_text_file_length_(NULL),
-      disks_accessed_bitmap_(TUnit::UNIT, 0),
-      bytes_read_local_(NULL),
-      bytes_read_short_circuit_(NULL),
-      bytes_read_dn_cache_(NULL),
-      num_remote_ranges_(NULL),
-      unexpected_remote_bytes_(NULL),
       done_(false),
       all_ranges_started_(false),
-      counters_running_(false),
       thread_avail_cb_id_(-1),
       rm_callback_id_(-1),
       max_num_scanner_threads_(CpuInfo::num_cores()) {
@@ -139,99 +80,6 @@ HdfsScanNode::HdfsScanNode(ObjectPool* pool, const TPlanNode& tnode,
 HdfsScanNode::~HdfsScanNode() {
 }
 
-Status HdfsScanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
-  RETURN_IF_ERROR(ExecNode::Init(tnode, state));
-
-  // Add collection item conjuncts
-  const map<TTupleId, vector<TExpr>>& collection_conjuncts =
-      tnode.hdfs_scan_node.collection_conjuncts;
-  map<TTupleId, vector<TExpr>>::const_iterator iter = collection_conjuncts.begin();
-  for (; iter != collection_conjuncts.end(); ++iter) {
-    DCHECK(conjuncts_map_[iter->first].empty());
-    RETURN_IF_ERROR(
-        Expr::CreateExprTrees(pool_, iter->second, &conjuncts_map_[iter->first]));
-  }
-
-  const TQueryOptions& query_options = state->query_options();
-  for (const TRuntimeFilterDesc& filter: tnode.runtime_filters) {
-    auto it = filter.planid_to_target_ndx.find(tnode.node_id);
-    DCHECK(it != filter.planid_to_target_ndx.end());
-    const TRuntimeFilterTargetDesc& target = filter.targets[it->second];
-    if (state->query_options().runtime_filter_mode == TRuntimeFilterMode::LOCAL &&
-        !target.is_local_target) {
-      continue;
-    }
-    if (query_options.disable_row_runtime_filtering &&
-        !target.is_bound_by_partition_columns) {
-      continue;
-    }
-
-    FilterContext filter_ctx;
-    RETURN_IF_ERROR(Expr::CreateExprTree(pool_, target.target_expr, &filter_ctx.expr));
-    filter_ctx.filter = state->filter_bank()->RegisterFilter(filter, false);
-
-    string filter_profile_title = Substitute("Filter $0 ($1)", filter.filter_id,
-        PrettyPrinter::Print(filter_ctx.filter->filter_size(), TUnit::BYTES));
-    RuntimeProfile* profile = state->obj_pool()->Add(
-        new RuntimeProfile(state->obj_pool(), filter_profile_title));
-    runtime_profile_->AddChild(profile);
-    filter_ctx.stats = state->obj_pool()->Add(new FilterStats(profile,
-        target.is_bound_by_partition_columns));
-
-    filter_ctxs_.push_back(filter_ctx);
-  }
-
-  // Add row batch conjuncts
-  DCHECK(conjuncts_map_[tuple_id_].empty());
-  conjuncts_map_[tuple_id_] = conjunct_ctxs_;
-
-  return Status::OK();
-}
-
-bool HdfsScanNode::FilePassesFilterPredicates(const vector<FilterContext>& filter_ctxs,
-    const THdfsFileFormat::type& format, HdfsFileDesc* file) {
-#ifndef NDEBUG
-  if (FLAGS_skip_file_runtime_filtering) return true;
-#endif
-  if (filter_ctxs_.size() == 0) return true;
-  ScanRangeMetadata* metadata =
-      reinterpret_cast<ScanRangeMetadata*>(file->splits[0]->meta_data());
-  if (!PartitionPassesFilters(metadata->partition_id, FilterStats::FILES_KEY,
-          filter_ctxs)) {
-    for (int j = 0; j < file->splits.size(); ++j) {
-      // Mark range as complete to ensure progress.
-      RangeComplete(format, file->file_compression);
-    }
-    return false;
-  }
-  return true;
-}
-
-bool HdfsScanNode::WaitForRuntimeFilters(int32_t time_ms) {
-  vector<string> arrived_filter_ids;
-  int32_t start = MonotonicMillis();
-  for (auto& ctx: filter_ctxs_) {
-    if (ctx.filter->WaitForArrival(time_ms)) {
-      arrived_filter_ids.push_back(Substitute("$0", ctx.filter->id()));
-    }
-  }
-  int32_t end = MonotonicMillis();
-  const string& wait_time = PrettyPrinter::Print(end - start, TUnit::TIME_MS);
-
-  if (arrived_filter_ids.size() == filter_ctxs_.size()) {
-    runtime_profile()->AddInfoString("Runtime filters",
-        Substitute("All filters arrived. Waited $0", wait_time));
-    VLOG_QUERY << "Filters arrived. Waited " << wait_time;
-    return true;
-  }
-
-  const string& filter_str = Substitute("Only following filters arrived: $0, waited $1",
-      join(arrived_filter_ids, ", "), wait_time);
-  runtime_profile()->AddInfoString("Runtime filters", filter_str);
-  VLOG_QUERY << filter_str;
-  return false;
-}
-
 Status HdfsScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
 
@@ -241,35 +89,7 @@ Status HdfsScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos
     // so we need to tell them there is work to do.
     // TODO: This is probably not worth splitting the organisational cost of splitting
     // initialisation across two places. Move to before the scanner threads start.
-    initial_ranges_issued_ = true;
-
-    int32 wait_time_ms = FLAGS_runtime_filter_wait_time_ms;
-    if (state->query_options().runtime_filter_wait_time_ms > 0) {
-      wait_time_ms = state->query_options().runtime_filter_wait_time_ms;
-    }
-    if (filter_ctxs_.size() > 0) WaitForRuntimeFilters(wait_time_ms);
-    // Apply dynamic partition-pruning per-file.
-    FileFormatsMap matching_per_type_files;
-    for (const FileFormatsMap::value_type& v: per_type_files_) {
-      vector<HdfsFileDesc*>* matching_files = &matching_per_type_files[v.first];
-      for (HdfsFileDesc* file: v.second) {
-        if (FilePassesFilterPredicates(filter_ctxs_, v.first, file)) {
-          matching_files->push_back(file);
-        }
-      }
-    }
-
-    // Issue initial ranges for all file types.
-    RETURN_IF_ERROR(HdfsParquetScanner::IssueInitialRanges(this,
-        matching_per_type_files[THdfsFileFormat::PARQUET]));
-    RETURN_IF_ERROR(HdfsTextScanner::IssueInitialRanges(this,
-        matching_per_type_files[THdfsFileFormat::TEXT]));
-    RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this,
-        matching_per_type_files[THdfsFileFormat::SEQUENCE_FILE]));
-    RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this,
-        matching_per_type_files[THdfsFileFormat::RC_FILE]));
-    RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this,
-        matching_per_type_files[THdfsFileFormat::AVRO]));
+    RETURN_IF_ERROR(IssueInitialScanRanges(state));
 
     // Release the scanner threads
     ranges_issued_barrier_.Notify();
@@ -278,7 +98,11 @@ Status HdfsScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos
   }
 
   Status status = GetNextInternal(state, row_batch, eos);
-  if (!status.ok() || *eos) StopAndFinalizeCounters();
+  if (!status.ok() || *eos) {
+    unique_lock<mutex> l(lock_);
+    lock_guard<SpinLock> l2(file_type_counts_);
+    StopAndFinalizeCounters();
+  }
   return status;
 }
 
@@ -326,283 +150,16 @@ Status HdfsScanNode::GetNextInternal(
   return status_;
 }
 
-DiskIoMgr::ScanRange* HdfsScanNode::AllocateScanRange(
-    hdfsFS fs, const char* file, int64_t len, int64_t offset, int64_t partition_id,
-    int disk_id, bool try_cache, bool expected_local, int64_t mtime,
-    const DiskIoMgr::ScanRange* original_split) {
-  DCHECK_GE(disk_id, -1);
-  // Require that the scan range is within [0, file_length). While this cannot be used
-  // to guarantee safety (file_length metadata may be stale), it avoids different
-  // behavior between Hadoop FileSystems (e.g. s3n hdfsSeek() returns error when seeking
-  // beyond the end of the file).
-  DCHECK_GE(offset, 0);
-  DCHECK_GE(len, 0);
-  DCHECK_LE(offset + len, GetFileDesc(file)->file_length)
-      << "Scan range beyond end of file (offset=" << offset << ", len=" << len << ")";
-  disk_id = runtime_state_->io_mgr()->AssignQueue(file, disk_id, expected_local);
-
-  ScanRangeMetadata* metadata = runtime_state_->obj_pool()->Add(
-        new ScanRangeMetadata(partition_id, original_split));
-  DiskIoMgr::ScanRange* range =
-      runtime_state_->obj_pool()->Add(new DiskIoMgr::ScanRange());
-  range->Reset(fs, file, len, offset, disk_id, try_cache, expected_local,
-      mtime, metadata);
-  return range;
-}
-
-HdfsFileDesc* HdfsScanNode::GetFileDesc(const string& filename) {
-  DCHECK(file_descs_.find(filename) != file_descs_.end());
-  return file_descs_[filename];
-}
-
-void HdfsScanNode::SetFileMetadata(const string& filename, void* metadata) {
-  unique_lock<mutex> l(metadata_lock_);
-  DCHECK(per_file_metadata_.find(filename) == per_file_metadata_.end());
-  per_file_metadata_[filename] = metadata;
-}
-
-void* HdfsScanNode::GetFileMetadata(const string& filename) {
-  unique_lock<mutex> l(metadata_lock_);
-  map<string, void*>::iterator it = per_file_metadata_.find(filename);
-  if (it == per_file_metadata_.end()) return NULL;
-  return it->second;
-}
-
-void* HdfsScanNode::GetCodegenFn(THdfsFileFormat::type type) {
-  CodegendFnMap::iterator it = codegend_fn_map_.find(type);
-  if (it == codegend_fn_map_.end()) return NULL;
-  return it->second;
-}
-
-Status HdfsScanNode::CreateAndOpenScanner(HdfsPartitionDescriptor* partition,
-    ScannerContext* context, scoped_ptr<HdfsScanner>* scanner) {
-  DCHECK(context != NULL);
-  THdfsCompression::type compression =
-      context->GetStream()->file_desc()->file_compression;
-
-  // Create a new scanner for this file format and compression.
-  switch (partition->file_format()) {
-    case THdfsFileFormat::TEXT:
-      // Lzo-compressed text files are scanned by a scanner that it is implemented as a
-      // dynamic library, so that Impala does not include GPL code.
-      if (compression == THdfsCompression::LZO) {
-        scanner->reset(HdfsLzoTextScanner::GetHdfsLzoTextScanner(this, runtime_state_));
-      } else {
-        scanner->reset(new HdfsTextScanner(this, runtime_state_, true));
-      }
-      break;
-    case THdfsFileFormat::SEQUENCE_FILE:
-      scanner->reset(new HdfsSequenceScanner(this, runtime_state_, true));
-      break;
-    case THdfsFileFormat::RC_FILE:
-      scanner->reset(new HdfsRCFileScanner(this, runtime_state_, true));
-      break;
-    case THdfsFileFormat::AVRO:
-      scanner->reset(new HdfsAvroScanner(this, runtime_state_, true));
-      break;
-    case THdfsFileFormat::PARQUET:
-      scanner->reset(new HdfsParquetScanner(this, runtime_state_, true));
-      break;
-    default:
-      return Status(Substitute("Unknown Hdfs file format type: $0",
-          partition->file_format()));
-  }
-  DCHECK(scanner->get() != NULL);
-  Status status = ExecDebugAction(TExecNodePhase::PREPARE_SCANNER, runtime_state_);
-  if (status.ok()) {
-    status = scanner->get()->Open(context);
-    if (!status.ok()) scanner->get()->Close(scanner->get()->batch());
-  } else {
-    context->ClearStreams();
-  }
-  return status;
-}
-
-Tuple* HdfsScanNode::InitTemplateTuple(RuntimeState* state,
-    const vector<ExprContext*>& value_ctxs) {
-  if (partition_key_slots_.empty()) return NULL;
-
-  // Lock to protect access to partition_key_pool_ and value_ctxs
-  // TODO: we can push the lock to the mempool and exprs_values should not
-  // use internal memory.
-  Tuple* template_tuple = InitEmptyTemplateTuple(*tuple_desc_);
-
-  unique_lock<mutex> l(lock_);
-  for (int i = 0; i < partition_key_slots_.size(); ++i) {
-    const SlotDescriptor* slot_desc = partition_key_slots_[i];
-    // Exprs guaranteed to be literals, so can safely be evaluated without a row context
-    void* value = value_ctxs[slot_desc->col_pos()]->GetValue(NULL);
-    RawValue::Write(value, template_tuple, slot_desc, NULL);
-  }
-  return template_tuple;
-}
-
-Tuple* HdfsScanNode::InitEmptyTemplateTuple(const TupleDescriptor& tuple_desc) {
-  Tuple* template_tuple = NULL;
-  {
-    unique_lock<mutex> l(lock_);
-    template_tuple = Tuple::Create(tuple_desc.byte_size(), scan_node_pool_.get());
-  }
-  memset(template_tuple, 0, tuple_desc.byte_size());
-  return template_tuple;
-}
-
-void HdfsScanNode::TransferToScanNodePool(MemPool* pool) {
-  unique_lock<mutex> l(lock_);
-  scan_node_pool_->AcquireData(pool, false);
-}
-
-Status HdfsScanNode::TriggerDebugAction() {
-  return ExecDebugAction(TExecNodePhase::GETNEXT, runtime_state_);
-}
-
 Status HdfsScanNode::Prepare(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
-  runtime_state_ = state;
-  RETURN_IF_ERROR(ScanNode::Prepare(state));
-
-  tuple_desc_ = state->desc_tbl().GetTupleDescriptor(tuple_id_);
-  DCHECK(tuple_desc_ != NULL);
-
-  // Prepare collection conjuncts
-  ConjunctsMap::const_iterator iter = conjuncts_map_.begin();
-  for (; iter != conjuncts_map_.end(); ++iter) {
-    TupleDescriptor* tuple_desc = state->desc_tbl().GetTupleDescriptor(iter->first);
-
-    // conjuncts_ are already prepared in ExecNode::Prepare(), don't try to prepare again
-    if (tuple_desc == tuple_desc_) continue;
-
-    RowDescriptor* collection_row_desc =
-        state->obj_pool()->Add(new RowDescriptor(tuple_desc, /* is_nullable */ false));
-    RETURN_IF_ERROR(
-        Expr::Prepare(iter->second, state, *collection_row_desc, expr_mem_tracker()));
-  }
+  RETURN_IF_ERROR(HdfsScanNodeBase::Prepare(state));
 
+  // Assign scanner thread group to cgroup, if any.
   if (!state->cgroup().empty()) {
     scanner_threads_.SetCgroupsMgr(state->exec_env()->cgroups_mgr());
     scanner_threads_.SetCgroup(state->cgroup());
   }
 
-  // One-time initialisation of state that is constant across scan ranges
-  DCHECK(tuple_desc_->table_desc() != NULL);
-  hdfs_table_ = static_cast<const HdfsTableDescriptor*>(tuple_desc_->table_desc());
-  scan_node_pool_.reset(new MemPool(mem_tracker()));
-
-  for (int i = 0; i < filter_ctxs_.size(); ++i) {
-    RETURN_IF_ERROR(
-        filter_ctxs_[i].expr->Prepare(state, row_desc(), expr_mem_tracker()));
-    AddExprCtxToFree(filter_ctxs_[i].expr);
-  }
-
-  // Parse Avro table schema if applicable
-  const string& avro_schema_str = hdfs_table_->avro_schema();
-  if (!avro_schema_str.empty()) {
-    avro_schema_t avro_schema;
-    int error = avro_schema_from_json_length(
-        avro_schema_str.c_str(), avro_schema_str.size(), &avro_schema);
-    if (error != 0) {
-      return Status(Substitute("Failed to parse table schema: $0", avro_strerror()));
-    }
-    RETURN_IF_ERROR(AvroSchemaElement::ConvertSchema(avro_schema, avro_schema_.get()));
-  }
-
-  // Gather materialized partition-key slots and non-partition slots.
-  const vector<SlotDescriptor*>& slots = tuple_desc_->slots();
-  for (size_t i = 0; i < slots.size(); ++i) {
-    if (hdfs_table_->IsClusteringCol(slots[i])) {
-      partition_key_slots_.push_back(slots[i]);
-    } else {
-      materialized_slots_.push_back(slots[i]);
-    }
-  }
-
-  // Order the materialized slots such that for schemaless file formats (e.g. text) the
-  // order corresponds to the physical order in files. For formats where the file schema
-  // is independent of the table schema (e.g. Avro, Parquet), this step is not necessary.
-  sort(materialized_slots_.begin(), materialized_slots_.end(),
-      SlotDescriptor::ColPathLessThan);
-
-  // Populate mapping from slot path to index into materialized_slots_.
-  for (int i = 0; i < materialized_slots_.size(); ++i) {
-    path_to_materialized_slot_idx_[materialized_slots_[i]->col_path()] = i;
-  }
-
-  // Initialize is_materialized_col_
-  is_materialized_col_.resize(hdfs_table_->num_cols());
-  for (int i = 0; i < hdfs_table_->num_cols(); ++i) {
-    is_materialized_col_[i] = GetMaterializedSlotIdx(vector<int>(1, i)) != SKIP_COLUMN;
-  }
-
-  HdfsFsCache::HdfsFsMap fs_cache;
-  // Convert the TScanRangeParams into per-file DiskIO::ScanRange objects and populate
-  // partition_ids_, file_descs_, and per_type_files_.
-  DCHECK(scan_range_params_ != NULL)
-      << "Must call SetScanRanges() before calling Prepare()";
-  int num_ranges_missing_volume_id = 0;
-  for (int i = 0; i < scan_range_params_->size(); ++i) {
-    DCHECK((*scan_range_params_)[i].scan_range.__isset.hdfs_file_split);
-    const THdfsFileSplit& split = (*scan_range_params_)[i].scan_range.hdfs_file_split;
-    partition_ids_.insert(split.partition_id);
-    HdfsPartitionDescriptor* partition_desc =
-        hdfs_table_->GetPartition(split.partition_id);
-    if (partition_desc == NULL) {
-      // TODO: this should be a DCHECK but we sometimes hit it. It's likely IMPALA-1702.
-      LOG(ERROR) << "Bad table descriptor! table_id=" << hdfs_table_->id()
-                 << " partition_id=" << split.partition_id
-                 << "\n" << PrintThrift(state->fragment_params());
-      return Status("Query encountered invalid metadata, likely due to IMPALA-1702."
-                    " Try rerunning the query.");
-    }
-
-    if (partition_template_tuple_map_.find(split.partition_id) ==
-        partition_template_tuple_map_.end()) {
-      partition_template_tuple_map_[split.partition_id] =
-          InitTemplateTuple(state, partition_desc->partition_key_value_ctxs());;
-    }
-
-    filesystem::path file_path(partition_desc->location());
-    file_path.append(split.file_name, filesystem::path::codecvt());
-    const string& native_file_path = file_path.native();
-
-    HdfsFileDesc* file_desc = NULL;
-    FileDescMap::iterator file_desc_it = file_descs_.find(native_file_path);
-    if (file_desc_it == file_descs_.end()) {
-      // Add new file_desc to file_descs_ and per_type_files_
-      file_desc = runtime_state_->obj_pool()->Add(new HdfsFileDesc(native_file_path));
-      file_descs_[native_file_path] = file_desc;
-      file_desc->file_length = split.file_length;
-      file_desc->mtime = split.mtime;
-      file_desc->file_compression = split.file_compression;
-      RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
-          native_file_path, &file_desc->fs, &fs_cache));
-      num_unqueued_files_.Add(1);
-      per_type_files_[partition_desc->file_format()].push_back(file_desc);
-    } else {
-      // File already processed
-      file_desc = file_desc_it->second;
-    }
-
-    bool expected_local =
-        (*scan_range_params_)[i].__isset.is_remote && !(*scan_range_params_)[i].is_remote;
-    if (expected_local && (*scan_range_params_)[i].volume_id == -1) {
-      if (!FLAGS_suppress_unknown_disk_id_warnings && !unknown_disk_id_warned_) {
-        runtime_profile()->AppendExecOption("Missing Volume Id");
-        runtime_state()->LogError(ErrorMsg(TErrorCode::HDFS_SCAN_NODE_UNKNOWN_DISK));
-        unknown_disk_id_warned_ = true;
-      }
-      ++num_ranges_missing_volume_id;
-    }
-
-    bool try_cache = (*scan_range_params_)[i].is_cached;
-    if (runtime_state_->query_options().disable_cached_reads) {
-      DCHECK(!try_cache) << "Params should not have had this set.";
-    }
-    file_desc->splits.push_back(
-        AllocateScanRange(file_desc->fs, file_desc->filename.c_str(), split.length,
-            split.offset, split.partition_id, (*scan_range_params_)[i].volume_id,
-            try_cache, expected_local, file_desc->mtime));
-  }
-
   // Compute the minimum bytes required to start a new thread. This is based on the
   // file format.
   // The higher the estimate, the less likely it is the query will fail but more likely
@@ -632,100 +189,17 @@ Status HdfsScanNode::Prepare(RuntimeState* state) {
     }
   }
   scanner_thread_bytes_required_ += scanner_thread_mem_usage;
-
-  // Prepare all the partitions scanned by the scan node
-  for (int64_t partition_id: partition_ids_) {
-    HdfsPartitionDescriptor* partition_desc = hdfs_table_->GetPartition(partition_id);
-    // This is IMPALA-1702, but will have been caught earlier in this method.
-    DCHECK(partition_desc != NULL) << "table_id=" << hdfs_table_->id()
-                                   << " partition_id=" << partition_id
-                                   << "\n" << PrintThrift(state->fragment_params());
-    RETURN_IF_ERROR(partition_desc->PrepareExprs(state));
-  }
-
-  // Update server wide metrics for number of scan ranges and ranges that have
-  // incomplete metadata.
-  ImpaladMetrics::NUM_RANGES_PROCESSED->Increment(scan_range_params_->size());
-  ImpaladMetrics::NUM_RANGES_MISSING_VOLUME_ID->Increment(num_ranges_missing_volume_id);
-
-  // Add per volume stats to the runtime profile
-  PerVolumnStats per_volume_stats;
-  stringstream str;
-  UpdateHdfsSplitStats(*scan_range_params_, &per_volume_stats);
-  PrintHdfsSplitStats(per_volume_stats, &str);
-  runtime_profile()->AddInfoString(HDFS_SPLIT_STATS_DESC, str.str());
-
-  // Create codegen'd functions
-  for (int format = THdfsFileFormat::TEXT;
-       format <= THdfsFileFormat::PARQUET; ++format) {
-    vector<HdfsFileDesc*>& file_descs =
-        per_type_files_[static_cast<THdfsFileFormat::type>(format)];
-
-    if (file_descs.empty()) continue;
-
-    // Randomize the order this node processes the files. We want to do this to avoid
-    // issuing remote reads to the same DN from different impalads. In file formats such
-    // as avro/seq/rc (i.e. splittable with a header), every node first reads the header.
-    // If every node goes through the files in the same order, all the remote reads are
-    // for the same file meaning a few DN serves a lot of remote reads at the same time.
-    random_shuffle(file_descs.begin(), file_descs.end());
-
-    // Create reusable codegen'd functions for each file type type needed
-    // TODO: do this for conjuncts_map_
-    Function* fn;
-    Status status;
-    switch (format) {
-      case THdfsFileFormat::TEXT:
-        status = HdfsTextScanner::Codegen(this, conjunct_ctxs_, &fn);
-        break;
-      case THdfsFileFormat::SEQUENCE_FILE:
-        status = HdfsSequenceScanner::Codegen(this, conjunct_ctxs_, &fn);
-        break;
-      case THdfsFileFormat::AVRO:
-        status = HdfsAvroScanner::Codegen(this, conjunct_ctxs_, &fn);
-        break;
-      case THdfsFileFormat::PARQUET:
-        status = HdfsParquetScanner::Codegen(this, conjunct_ctxs_, &fn);
-        break;
-      default:
-        // No codegen for this format
-        fn = NULL;
-        status = Status("Not implemented for this format.");
-    }
-    DCHECK(fn != NULL || !status.ok());
-
-    const char* format_name = _THdfsFileFormat_VALUES_TO_NAMES.find(format)->second;
-    if (!status.ok()) {
-      runtime_profile()->AddCodegenMsg(false, status, format_name);
-    } else {
-      runtime_profile()->AddCodegenMsg(true, status, format_name);
-      LlvmCodeGen* codegen;
-      RETURN_IF_ERROR(runtime_state_->GetCodegen(&codegen));
-      codegen->AddFunctionToJit(
-          fn, &codegend_fn_map_[static_cast<THdfsFileFormat::type>(format)]);
-    }
-  }
-
   return Status::OK();
 }
 
-// This function initiates the connection to hdfs and starts up the initial scanner
-// threads. The scanner subclasses are passed the initial splits. Scanners are expected to
-// queue up a non-zero number of those splits to the io mgr (via the ScanNode). Scan
-// ranges are not issued until the first GetNext() call; scanner threads will block on
-// ranges_issued_barrier_ until ranges are issued.
+// This function registers the ThreadTokenAvailableCb to start up the initial scanner
+// threads. Scan ranges are not issued until the first GetNext() call; scanner threads
+// will block on ranges_issued_barrier_ until ranges are issued.
 Status HdfsScanNode::Open(RuntimeState* state) {
-  RETURN_IF_ERROR(ExecNode::Open(state));
-
-  // Open collection conjuncts
-  ConjunctsMap::const_iterator iter = conjuncts_map_.begin();
-  for (; iter != conjuncts_map_.end(); ++iter) {
-    // conjuncts_ are already opened in ExecNode::Open()
-    if (iter->first == tuple_id_) continue;
-    RETURN_IF_ERROR(Expr::Open(iter->second, state));
-  }
+  SCOPED_TIMER(runtime_profile_->total_time_counter());
+  RETURN_IF_ERROR(HdfsScanNodeBase::Open(state));
 
-  for (auto& filter_ctx: filter_ctxs_) RETURN_IF_ERROR(filter_ctx.expr->Open(state));
+  if (file_descs_.empty() || progress_.done()) return Status::OK();
 
   // We need at least one scanner thread to make progress. We need to make this
   // reservation before any ranges are issued.
@@ -744,95 +218,9 @@ Status HdfsScanNode::Open(RuntimeState* state) {
             runtime_state_->resource_pool()));
   }
 
-  if (file_descs_.empty()) {
-    SetDone();
-    return Status::OK();
-  }
-
-  // Open all the partition exprs used by the scan node
-  for (int64_t partition_id: partition_ids_) {
-    HdfsPartitionDescriptor* partition_desc = hdfs_table_->GetPartition(partition_id);
-    DCHECK(partition_desc != NULL) << "table_id=" << hdfs_table_->id()
-                                   << " partition_id=" << partition_id
-                                   << "\n" << PrintThrift(state->fragment_params());
-    RETURN_IF_ERROR(partition_desc->OpenExprs(state));
-  }
-
-  RETURN_IF_ERROR(runtime_state_->io_mgr()->RegisterContext(
-      &reader_context_, mem_tracker()));
-
-  // Initialize HdfsScanNode specific counters
-  read_timer_ = ADD_TIMER(runtime_profile(), TOTAL_HDFS_READ_TIMER);
-  per_read_thread_throughput_counter_ = runtime_profile()->AddDerivedCounter(
-      PER_READ_THREAD_THROUGHPUT_COUNTER, TUnit::BYTES_PER_SECOND,
-      bind<int64_t>(&RuntimeProfile::UnitsPerSecond, bytes_read_counter_, read_timer_));
-  scan_ranges_complete_counter_ =
-      ADD_COUNTER(runtime_profile(), SCAN_RANGES_COMPLETE_COUNTER, TUnit::UNIT);
-  if (DiskInfo::num_disks() < 64) {
-    num_disks_accessed_counter_ =
-        ADD_COUNTER(runtime_profile(), NUM_DISKS_ACCESSED_COUNTER, TUnit::UNIT);
-  } else {
-    num_disks_accessed_counter_ = NULL;
-  }
-  num_scanner_threads_started_counter_ =
-      ADD_COUNTER(runtime_profile(), NUM_SCANNER_THREADS_STARTED, TUnit::UNIT);
-
-  runtime_state_->io_mgr()->set_bytes_read_counter(reader_context_, bytes_read_counter());
-  runtime_state_->io_mgr()->set_read_timer(reader_context_, read_timer());
-  runtime_state_->io_mgr()->set_active_read_thread_counter(reader_context_,
-      &active_hdfs_read_thread_counter_);
-  runtime_state_->io_mgr()->set_disks_access_bitmap(reader_context_,
-      &disks_accessed_bitmap_);
-
-  average_scanner_thread_concurrency_ = runtime_profile()->AddSamplingCounter(
-      AVERAGE_SCANNER_THREAD_CONCURRENCY, &active_scanner_thread_counter_);
-  average_hdfs_read_thread_concurrency_ = runtime_profile()->AddSamplingCounter(
-      AVERAGE_HDFS_READ_THREAD_CONCURRENCY, &active_hdfs_read_thread_counter_);
-
-  bytes_read_local_ = ADD_COUNTER(runtime_profile(), "BytesReadLocal",
-      TUnit::BYTES);
-  bytes_read_short_circuit_ = ADD_COUNTER(runtime_profile(), "BytesReadShortCircuit",
-      TUnit::BYTES);
-  bytes_read_dn_cache_ = ADD_COUNTER(runtime_profile(), "BytesReadDataNodeCache",
-      TUnit::BYTES);
-  num_remote_ranges_ = ADD_COUNTER(runtime_profile(), "RemoteScanRanges",
-      TUnit::UNIT);
-  unexpected_remote_bytes_ = ADD_COUNTER(runtime_profile(), "BytesReadRemoteUnexpected",
-      TUnit::BYTES);
-
-  max_compressed_text_file_length_ = runtime_profile()->AddHighWaterMarkCounter(
-      "MaxCompressedTextFileLength", TUnit::BYTES);
-
-  for (int i = 0; i < state->io_mgr()->num_total_disks() + 1; ++i) {
-    hdfs_read_thread_concurrency_bucket_.push_back(
-        pool_->Add(new RuntimeProfile::Counter(TUnit::DOUBLE_VALUE, 0)));
-  }
-  runtime_profile()->RegisterBucketingCounters(&active_hdfs_read_thread_counter_,
-      &hdfs_read_thread_concurrency_bucket_);
-
-  counters_running_ = true;
-
-  int total_splits = 0;
-  for (FileDescMap::iterator it = file_descs_.begin(); it != file_descs_.end(); ++it) {
-    total_splits += it->second->splits.size();
-  }
-
-  if (total_splits == 0) {
-    SetDone();
-    return Status::OK();
-  }
-
-  stringstream ss;
-  ss << "Splits complete (node=" << id() << "):";
-  progress_.Init(ss.str(), total_splits);
   return Status::OK();
 }
 
-Status HdfsScanNode::Reset(RuntimeState* state) {
-  DCHECK(false) << "Internal error: Scan nodes should not appear in subplans.";
-  return Status("Internal error: Scan nodes should not appear in subplans.");
-}
-
 void HdfsScanNode::Close(RuntimeState* state) {
   if (is_closed()) return;
   SetDone();
@@ -849,47 +237,36 @@ void HdfsScanNode::Close(RuntimeState* state) {
   num_owned_io_buffers_.Add(-materialized_row_batches_->Cleanup());
   DCHECK_EQ(num_owned_io_buffers_.Load(), 0) << "ScanNode has leaked io buffers";
 
-  if (reader_context_ != NULL) {
-    // There may still be io buffers used by parent nodes so we can't unregister the
-    // reader context yet. The runtime state keeps a list of all the reader contexts and
-    // they are unregistered when the fragment is closed.
-    state->reader_contexts()->push_back(reader_context_);
-    // Need to wait for all the active scanner threads to finish to ensure there is no
-    // more memory tracked by this scan node's mem tracker.
-    state->io_mgr()->CancelContext(reader_context_, true);
-  }
-
-  StopAndFinalizeCounters();
+  HdfsScanNodeBase::Close(state);
+}
 
-  // There should be no active scanner threads and hdfs read threads.
-  DCHECK_EQ(active_scanner_thread_counter_.value(), 0);
-  DCHECK_EQ(active_hdfs_read_thread_counter_.value(), 0);
+void HdfsScanNode::SetFileMetadata(const string& filename, void* metadata) {
+  unique_lock<mutex> l(metadata_lock_);
+  DCHECK(per_file_metadata_.find(filename) == per_file_metadata_.end());
+  per_file_metadata_[filename] = metadata;
+}
 
-  if (scan_node_pool_.get() != NULL) scan_node_pool_->FreeAll();
+void* HdfsScanNode::GetFileMetadata(const string& filename) {
+  unique_lock<mutex> l(metadata_lock_);
+  map<string, void*>::iterator it = per_file_metadata_.find(filename);
+  if (it == per_file_metadata_.end()) return NULL;
+  return it->second;
+}
 
-  // Close all the partitions scanned by the scan node
-  for (int64_t partition_id: partition_ids_) {
-    HdfsPartitionDescriptor* partition_desc = hdfs_table_->GetPartition(partition_id);
-    if (partition_desc == NULL) {
-      // TODO: Revert when IMPALA-1702 is fixed.
-      LOG(ERROR) << "Bad table descriptor! table_id=" << hdfs_table_->id()
-                 << " partition_id=" << partition_id
-                 << "\n" << PrintThrift(state->fragment_params());
-      continue;
-    }
-    partition_desc->CloseExprs(state);
-  }
+void HdfsScanNode::RangeComplete(const THdfsFileFormat::type& file_type,
+    const std::vector<THdfsCompression::type>& compression_type) {
+  lock_guard<SpinLock> l(file_type_counts_);
+  HdfsScanNodeBase::RangeComplete(file_type, compression_type);
+}
 
-  // Close collection conjuncts
-  ConjunctsMap::const_iterator iter = conjuncts_map_.begin();
-  for (; iter != conjuncts_map_.end(); ++iter) {
-    // conjuncts_ are already closed in ExecNode::Close()
-    if (iter->first == tuple_id_) continue;
-    Expr::Close(iter->second, state);
-  }
+void HdfsScanNode::TransferToScanNodePool(MemPool* pool) {
+  unique_lock<mutex> l(lock_);
+  scan_node_pool_->AcquireData(pool, false);
+}
 
-  for (auto& filter_ctx: filter_ctxs_) filter_ctx.expr->Close(state);
-  ScanNode::Close(state);
+void HdfsScanNode::AddMaterializedRowBatch(RowBatch* row_batch) {
+  InitNullCollectionValues(row_batch);
+  materialized_row_batches_->AddBatch(row_batch);
 }
 
 Status HdfsScanNode::AddDiskIoRanges(const vector<DiskIoMgr::ScanRange*>& ranges,
@@ -902,43 +279,6 @@ Status HdfsScanNode::AddDiskIoRanges(const vector<DiskIoMgr::ScanRange*>& ranges
   return Status::OK();
 }
 
-void HdfsScanNode::AddMaterializedRowBatch(RowBatch* row_batch) {
-  InitNullCollectionValues(row_batch);
-  materialized_row_batches_->AddBatch(row_batch);
-}
-
-void HdfsScanNode::InitNullCollectionValues(const TupleDescriptor* tuple_desc,
-    Tuple* tuple) const {
-  for (const SlotDescriptor* slot_desc: tuple_desc->collection_slots()) {
-    CollectionValue* slot = reinterpret_cast<CollectionValue*>(
-        tuple->GetSlot(slot_desc->tuple_offset()));
-    if (tuple->IsNull(slot_desc->null_indicator_offset())) {
-      *slot = CollectionValue();
-      continue;
-    }
-    // Recursively traverse collection items.
-    const TupleDescriptor* item_desc = slot_desc->collection_item_descriptor();
-    if (item_desc->collection_slots().empty()) continue;
-    for (int i = 0; i < slot->num_tuples; ++i) {
-      int item_offset = i * item_desc->byte_size();
-      Tuple* collection_item = reinterpret_cast<Tuple*>(slot->ptr + item_offset);
-      InitNullCollectionValues(item_desc, collection_item);
-    }
-  }
-}
-
-void HdfsScanNode::InitNullCollectionValues(RowBatch* row_batch) const {
-  DCHECK_EQ(row_batch->row_desc().tuple_descriptors().size(), 1);
-  const TupleDescriptor& tuple_desc =
-      *row_batch->row_desc().tuple_descriptors()[tuple_idx()];
-  if (tuple_desc.collection_slots().empty()) return;
-  for (int i = 0; i < row_batch->num_rows(); ++i) {
-    Tuple* tuple = row_batch->GetRow(i)->GetTuple(tuple_idx());
-    DCHECK(tuple != NULL);
-    InitNullCollectionValues(&tuple_desc, tuple);
-  }
-}
-
 // For controlling the amount of memory used for scanners, we approximate the
 // scanner mem usage based on scanner_thread_bytes_required_, rather than the
 // consumption in the scan node's mem tracker. The problem with the scan node
@@ -1161,33 +501,6 @@ void HdfsScanNode::ScannerThread() {
   runtime_state_->resource_pool()->ReleaseThreadToken(false);
 }
 
-bool HdfsScanNode::PartitionPassesFilters(int32_t partition_id,
-    const string& stats_name, const vector<FilterContext>& filter_ctxs) {
-  if (filter_ctxs.size() == 0) return true;
-  DCHECK_EQ(filter_ctxs.size(), filter_ctxs_.size())
-      << "Mismatched number of filter contexts";
-  Tuple* template_tuple = partition_template_tuple_map_[partition_id];
-  // Defensive - if template_tuple is NULL, there can be no filters on partition columns.
-  if (template_tuple == NULL) return true;
-  TupleRow* tuple_row_mem = reinterpret_cast<TupleRow*>(&template_tuple);
-  for (const FilterContext& ctx: filter_ctxs) {
-    int target_ndx = ctx.filter->filter_desc().planid_to_target_ndx.at(id_);
-    if (!ctx.filter->filter_desc().targets[target_ndx].is_bound_by_partition_columns) {
-      continue;
-    }
-    void* e = ctx.expr->GetValue(tuple_row_mem);
-
-    // Not quite right because bitmap could arrive after Eval(), but we're ok with
-    // off-by-one errors.
-    bool processed = ctx.filter->HasBloomFilter();
-    bool passed_filter = ctx.filter->Eval<void>(e, ctx.expr->root()->type());
-    ctx.stats->IncrCounters(stats_name, 1, processed, !passed_filter);
-    if (!passed_filter) return false;
-  }
-
-  return true;
-}
-
 namespace {
 
 // Returns true if 'format' uses a scanner derived from BaseSequenceScanner. Used to
@@ -1205,8 +518,7 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
 
   DCHECK(scan_range != NULL);
 
-  ScanRangeMetadata* metadata =
-      reinterpret_cast<ScanRangeMetadata*>(scan_range->meta_data());
+  ScanRangeMetadata* metadata = static_cast<ScanRangeMetadata*>(scan_range->meta_data());
   int64_t partition_id = metadata->partition_id;
   HdfsPartitionDescriptor* partition = hdfs_table_->GetPartition(partition_id);
   DCHECK(partition != NULL) << "table_id=" << hdfs_table_->id()
@@ -1269,26 +581,6 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
   return status;
 }
 
-void HdfsScanNode::RangeComplete(const THdfsFileFormat::type& file_type,
-    const THdfsCompression::type& compression_type) {
-  vector<THdfsCompression::type> types;
-  types.push_back(compression_type);
-  RangeComplete(file_type, types);
-}
-
-void HdfsScanNode::RangeComplete(const THdfsFileFormat::type& file_type,
-    const vector<THdfsCompression::type>& compression_types) {
-  scan_ranges_complete_counter()->Add(1);
-  progress_.Update(1);
-
-  {
-    lock_guard<SpinLock> l(file_type_counts_lock_);
-    for (int i = 0; i < compression_types.size(); ++i) {
-      ++file_type_counts_[make_pair(file_type, compression_types[i])];
-    }
-  }
-}
-
 void HdfsScanNode::SetDone() {
   {
     unique_lock<mutex> l(lock_);
@@ -1300,129 +592,3 @@ void HdfsScanNode::SetDone() {
   }
   materialized_row_batches_->Shutdown();
 }
-
-void HdfsScanNode::ComputeSlotMaterializationOrder(vector<int>* order) const {
-  const vector<ExprContext*>& conjuncts = ExecNode::conjunct_ctxs();
-  // Initialize all order to be conjuncts.size() (after the last conjunct)
-  order->insert(order->begin(), materialized_slots().size(), conjuncts.size());
-
-  const DescriptorTbl& desc_tbl = runtime_state_->desc_tbl();
-
-  vector<SlotId> slot_ids;
-  for (int conjunct_idx = 0; conjunct_idx < conjuncts.size(); ++conjunct_idx) {
-    slot_ids.clear();
-    int num_slots = conjuncts[conjunct_idx]->root()->GetSlotIds(&slot_ids);
-    for (int j = 0; j < num_slots; ++j) {
-      SlotDescriptor* slot_desc = desc_tbl.GetSlotDescriptor(slot_ids[j]);
-      int slot_idx = GetMaterializedSlotIdx(slot_desc->col_path());
-      // slot_idx == -1 means this was a partition key slot which is always
-      // materialized before any slots.
-      if (slot_idx == -1) continue;
-      // If this slot hasn't been assigned an order, assign it be materialized
-      // before evaluating conjuncts[i]
-      if ((*order)[slot_idx] == conjuncts.size()) {
-        (*order)[slot_idx] = conjunct_idx;
-      }
-    }
-  }
-}
-
-void HdfsScanNode::StopAndFinalizeCounters() {
-  unique_lock<mutex> l(lock_);
-  if (!counters_running_) return;
-  counters_running_ = false;
-
-  PeriodicCounterUpdater::StopTimeSeriesCounter(bytes_read_timeseries_counter_);
-  PeriodicCounterUpdater::StopRateCounter(total_throughput_counter());
-  PeriodicCounterUpdater::StopSamplingCounter(average_scanner_thread_concurrency_);
-  PeriodicCounterUpdater::StopSamplingCounter(average_hdfs_read_thread_concurrency_);
-  PeriodicCounterUpdater::StopBucketingCounters(&hdfs_read_thread_concurrency_bucket_,
-      true);
-
-  // Output hdfs read thread concurrency into info string
-  stringstream ss;
-  for (int i = 0; i < hdfs_read_thread_concurrency_bucket_.size(); ++i) {
-    ss << i << ":" << setprecision(4)
-       << hdfs_read_thread_concurrency_bucket_[i]->double_value() << "% ";
-  }
-  runtime_profile_->AddInfoString("Hdfs Read Thread Concurrency Bucket", ss.str());
-
-  // Convert disk access bitmap to num of disk accessed
-  uint64_t num_disk_bitmap = disks_accessed_bitmap_.value();
-  int64_t num_disk_accessed = BitUtil::Popcount(num_disk_bitmap);
-  if (num_disks_accessed_counter_ != NULL) {
-    num_disks_accessed_counter_->Set(num_disk_accessed);
-  }
-
-  // output completed file types and counts to info string
-  if (!file_type_counts_.empty()) {
-    stringstream ss;
-    {
-      lock_guard<SpinLock> l2(file_type_counts_lock_);
-      for (FileTypeCountsMap::const_iterator it = file_type_counts_.begin();
-          it != file_type_counts_.end(); ++it) {
-        ss << it->first.first << "/" << it->first.second << ":" << it->second << " ";
-      }
-    }
-    runtime_profile_->AddInfoString("File Formats", ss.str());
-  }
-
-  // Output fraction of scanners with codegen enabled
-  int num_enabled = num_scanners_codegen_enabled_.Load();
-  int total = num_enabled + num_scanners_codegen_disabled_.Load();
-  runtime_profile()->AppendExecOption(
-      Substitute("Codegen enabled: $0 out of $1", num_enabled, total));
-
-  if (reader_context_ != NULL) {
-    bytes_read_local_->Set(runtime_state_->io_mgr()->bytes_read_local(reader_context_));
-    bytes_read_short_circuit_->Set(
-        runtime_state_->io_mgr()->bytes_read_short_circuit(reader_context_));
-    bytes_read_dn_cache_->Set(
-        runtime_state_->io_mgr()->bytes_read_dn_cache(reader_context_));
-    num_remote_ranges_->Set(static_cast<int64_t>(
-        runtime_state_->io_mgr()->num_remote_ranges(reader_context_)));
-    unexpected_remote_bytes_->Set(
-        runtime_state_->io_mgr()->unexpected_remote_bytes(reader_context_));
-
-    if (unexpected_remote_bytes_->value() >= UNEXPECTED_REMOTE_BYTES_WARN_THRESHOLD) {
-      runtime_state_->LogError(ErrorMsg(TErrorCode::GENERAL, Substitute(
-          "Read $0 of data across network that was expected to be local. "
-          "Block locality metadata for table '$1.$2' may be stale. Consider running "
-          "\"INVALIDATE METADATA `$1`.`$2`\".",
-          PrettyPrinter::Print(unexpected_remote_bytes_->value(), TUnit::BYTES),
-          hdfs_table_->database(), hdfs_table_->name())));
-    }
-
-    ImpaladMetrics::IO_MGR_BYTES_READ->Increment(bytes_read_counter()->value());
-    ImpaladMetrics::IO_MGR_LOCAL_BYTES_READ->Increment(
-        bytes_read_local_->value());
-    ImpaladMetrics::IO_MGR_SHORT_CIRCUIT_BYTES_READ->Increment(
-        bytes_read_short_circuit_->value());
-    ImpaladMetrics::IO_MGR_CACHED_BYTES_READ->Increment(
-        bytes_read_dn_cache_->value());
-  }
-}
-
-void HdfsScanNode::UpdateHdfsSplitStats(
-    const vector<TScanRangeParams>& scan_range_params_list,
-    PerVolumnStats* per_volume_stats) {
-  pair<int, int64_t> init_value(0, 0);
-  for (const TScanRangeParams& scan_range_params: scan_range_params_list) {
-    const TScanRange& scan_range = scan_range_params.scan_range;
-    if (!scan_range.__isset.hdfs_file_split) continue;
-    const THdfsFileSplit& split = scan_range.hdfs_file_split;
-    pair<int, int64_t>* stats =
-        FindOrInsert(per_volume_stats, scan_range_params.volume_id, init_value);
-    ++(stats->first);
-    stats->second += split.length;
-  }
-}
-
-void HdfsScanNode::PrintHdfsSplitStats(const PerVolumnStats& per_volume_stats,
-    stringstream* ss) {
-  for (PerVolumnStats::const_iterator i = per_volume_stats.begin();
-       i != per_volume_stats.end(); ++i) {
-     (*ss) << i->first << ":" << i->second.first << "/"
-         << PrettyPrinter::Print(i->second.second, TUnit::BYTES) << " ";
-  }
-}