You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ab...@apache.org on 2016/09/08 04:39:35 UTC

[5/7] incubator-impala git commit: IMPALA-3905: Add single-threaded scan node.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/hdfs-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h
index 36e95b5..2efb24e 100644
--- a/be/src/exec/hdfs-scan-node.h
+++ b/be/src/exec/hdfs-scan-node.h
@@ -19,83 +19,29 @@
 #ifndef IMPALA_EXEC_HDFS_SCAN_NODE_H_
 #define IMPALA_EXEC_HDFS_SCAN_NODE_H_
 
-#include <vector>
-#include <memory>
+#include <map>
 #include <stdint.h>
+#include <vector>
 
-#include <boost/unordered_map.hpp>
-#include <boost/unordered_set.hpp>
 #include <boost/scoped_ptr.hpp>
-#include <boost/thread/condition_variable.hpp>
 #include <boost/thread/mutex.hpp>
 
 #include "exec/filter-context.h"
-#include "exec/scan-node.h"
-#include "exec/scanner-context.h"
-#include "runtime/descriptors.h"
+#include "exec/hdfs-scan-node-base.h"
 #include "runtime/disk-io-mgr.h"
-#include "util/avro-util.h"
 #include "util/counting-barrier.h"
-#include "util/progress-updater.h"
-#include "util/spinlock.h"
 #include "util/thread.h"
 
-#include "gen-cpp/PlanNodes_types.h"
-
 namespace impala {
 
 class DescriptorTbl;
-class HdfsScanner;
+class ObjectPool;
+class RuntimeState;
 class RowBatch;
-class RuntimeFilter;
-class Status;
-class Tuple;
 class TPlanNode;
-class TScanRange;
-
-/// Maintains per file information for files assigned to this scan node.  This includes
-/// all the splits for the file. Note that it is not thread-safe.
-struct HdfsFileDesc {
-  /// Connection to the filesystem containing the file.
-  hdfsFS fs;
-
-  /// File name including the path.
-  std::string filename;
-
-  /// Length of the file. This is not related to which parts of the file have been
-  /// assigned to this node.
-  int64_t file_length;
-
-  /// Last modified time
-  int64_t mtime;
-
-  THdfsCompression::type file_compression;
-
-  /// Splits (i.e. raw byte ranges) for this file, assigned to this scan node.
-  std::vector<DiskIoMgr::ScanRange*> splits;
-  HdfsFileDesc(const std::string& filename)
-    : filename(filename), file_length(0), mtime(0),
-      file_compression(THdfsCompression::NONE) {
-  }
-};
-
-/// Struct for additional metadata for scan ranges. This contains the partition id
-/// that this scan range is for.
-struct ScanRangeMetadata {
-  /// The partition id that this range is part of.
-  int64_t partition_id;
-
-  /// For parquet scan ranges we initially create a request for the file footer for each
-  /// split; we store a pointer to the actual split so that we can recover its information
-  /// for the scanner to process.
-  const DiskIoMgr::ScanRange* original_split;
-
-  ScanRangeMetadata(int64_t partition_id, const DiskIoMgr::ScanRange* original_split)
-      : partition_id(partition_id), original_split(original_split) { }
-};
 
-/// A ScanNode implementation that is used for all tables read directly from
-/// HDFS-serialised data.
+/// Legacy ScanNode implementation used in the non-multi-threaded execution mode
+/// that is used for all tables read directly from HDFS-serialised data.
 /// A HdfsScanNode spawns multiple scanner threads to process the bytes in
 /// parallel.  There is a handshake between the scan node and the scanners
 /// to get all the splits queued and bytes processed.
@@ -111,147 +57,36 @@ struct ScanRangeMetadata {
 /// 5. The scanner finishes the scan range and informs the scan node so it can track
 ///    end of stream.
 ///
-/// An HdfsScanNode may expect to receive runtime filters produced elsewhere in the plan
-/// (even from remote fragments). These filters arrive asynchronously during execution,
-/// and are applied as soon as they arrive. Filters may be applied by the scan node in the
-/// following scopes:
-///
-/// 1. Per-file (all file formats, partition column filters only) - filtering at this
-/// scope saves IO as the filters are applied before scan ranges are issued.
-/// 2. Per-scan-range (all file formats, partition column filters only) - filtering at
-/// this scope saves CPU as filtered scan ranges are never scanned.
-///
-/// Scanners may also use the same filters to eliminate rows at finer granularities
-/// (e.g. per row).
-///
-/// TODO: this class allocates a bunch of small utility objects that should be
+/// TODO: This class allocates a bunch of small utility objects that should be
 /// recycled.
-class HdfsScanNode : public ScanNode {
+/// TODO: Remove this class once the fragment-based multi-threaded execution is
+/// fully functional.
+class HdfsScanNode : public HdfsScanNodeBase {
  public:
   HdfsScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
-
   ~HdfsScanNode();
 
-  /// ExecNode methods
-  virtual Status Init(const TPlanNode& tnode, RuntimeState* state);
   virtual Status Prepare(RuntimeState* state);
   virtual Status Open(RuntimeState* state);
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
-  virtual Status Reset(RuntimeState* state);
   virtual void Close(RuntimeState* state);
 
-  int limit() const { return limit_; }
-
-  const std::vector<SlotDescriptor*>& materialized_slots()
-      const { return materialized_slots_; }
-
-  /// Returns the tuple idx into the row for this scan node to output to.
-  /// Currently this is always 0.
-  int tuple_idx() const { return 0; }
-
-  /// Returns number of partition keys in the table.
-  int num_partition_keys() const { return hdfs_table_->num_clustering_cols(); }
-
-  /// Returns number of partition key slots.
-  int num_materialized_partition_keys() const { return partition_key_slots_.size(); }
-
-  const TupleDescriptor* tuple_desc() const { return tuple_desc_; }
-
-  const HdfsTableDescriptor* hdfs_table() { return hdfs_table_; }
-
-  const AvroSchemaElement& avro_schema() { return *avro_schema_.get(); }
-
-  RuntimeState* runtime_state() { return runtime_state_; }
+  virtual bool HasRowBatchQueue() const { return true; }
 
-  int skip_header_line_count() const { return skip_header_line_count_; }
+  bool done() const { return done_; }
 
-  DiskIoRequestContext* reader_context() { return reader_context_; }
-
-  typedef std::map<TupleId, std::vector<ExprContext*>> ConjunctsMap;
-  const ConjunctsMap& conjuncts_map() const { return conjuncts_map_; }
-
-  RuntimeProfile::HighWaterMarkCounter* max_compressed_text_file_length() {
-    return max_compressed_text_file_length_;
-  }
-
-  const static int SKIP_COLUMN = -1;
-
-  /// Returns index into materialized_slots with 'path'.  Returns SKIP_COLUMN if
-  /// that path is not materialized.
-  int GetMaterializedSlotIdx(const std::vector<int>& path) const {
-    PathToSlotIdxMap::const_iterator result = path_to_materialized_slot_idx_.find(path);
-    if (result == path_to_materialized_slot_idx_.end()) return SKIP_COLUMN;
-    return result->second;
-  }
-
-  /// The result array is of length hdfs_table_->num_cols(). The i-th element is true iff
-  /// column i should be materialized.
-  const bool* is_materialized_col() {
-    return reinterpret_cast<const bool*>(&is_materialized_col_[0]);
-  }
-
-  /// Returns the per format codegen'd function.  Scanners call this to get the
-  /// codegen'd function to use.  Returns NULL if codegen should not be used.
-  void* GetCodegenFn(THdfsFileFormat::type);
-
-  inline void IncNumScannersCodegenEnabled() {
-    num_scanners_codegen_enabled_.Add(1);
-  }
-
-  inline void IncNumScannersCodegenDisabled() {
-    num_scanners_codegen_disabled_.Add(1);
-  }
+  /// Adds ranges to the io mgr queue and starts up new scanner threads if possible.
+  virtual Status AddDiskIoRanges(const std::vector<DiskIoMgr::ScanRange*>& ranges,
+      int num_files_queued);
 
   /// Adds a materialized row batch for the scan node.  This is called from scanner
   /// threads.
   /// This function will block if materialized_row_batches_ is full.
   void AddMaterializedRowBatch(RowBatch* row_batch);
 
-  /// Allocate a new scan range object, stored in the runtime state's object pool. For
-  /// scan ranges that correspond to the original hdfs splits, the partition id must be
-  /// set to the range's partition id. For other ranges (e.g. columns in parquet, read
-  /// past buffers), the partition_id is unused. expected_local should be true if this
-  /// scan range is not expected to require a remote read. The range must fall within
-  /// the file bounds. That is, the offset must be >= 0, and offset + len <= file_length.
-  /// If not NULL, the 'original_split' pointer is stored for reference in the scan range
-  /// metadata of the scan range that is to be allocated.
+  /// Sets the scanner specific metadata for 'filename'.
   /// This is thread safe.
-  DiskIoMgr::ScanRange* AllocateScanRange(
-      hdfsFS fs, const char* file, int64_t len, int64_t offset, int64_t partition_id,
-      int disk_id, bool try_cache, bool expected_local, int64_t mtime,
-      const DiskIoMgr::ScanRange* original_split = NULL);
-
-  /// Adds ranges to the io mgr queue and starts up new scanner threads if possible.
-  /// 'num_files_queued' indicates how many file's scan ranges have been added
-  /// completely.  A file's scan ranges are added completely if no new scanner threads
-  /// will be needed to process that file besides the additional threads needed to
-  /// process those in 'ranges'.
-  Status AddDiskIoRanges(const std::vector<DiskIoMgr::ScanRange*>& ranges,
-      int num_files_queued);
-
-  /// Adds all splits for file_desc to the io mgr queue and indicates one file has
-  /// been added completely.
-  inline Status AddDiskIoRanges(const HdfsFileDesc* file_desc) {
-    return AddDiskIoRanges(file_desc->splits, 1);
-  }
-
-  /// Allocates and initialises template_tuple_ with any values from the partition columns
-  /// for the current scan range
-  /// Returns NULL if there are no partition keys slots.
-  /// TODO: cache the tuple template in the partition object.
-  Tuple* InitTemplateTuple(RuntimeState* state,
-                           const std::vector<ExprContext*>& value_ctxs);
-
-  /// Allocates and return an empty template tuple (i.e. with no values filled in).
-  /// Scanners can use this method to initialize a template tuple even if there are no
-  /// partition keys slots (e.g. to hold Avro default values).
-  Tuple* InitEmptyTemplateTuple(const TupleDescriptor& tuple_desc);
-
-  /// Acquires all allocations from pool into scan_node_pool_. Thread-safe.
-  void TransferToScanNodePool(MemPool* pool);
-
-  /// Returns the file desc for 'filename'.  Returns NULL if filename is invalid.
-  HdfsFileDesc* GetFileDesc(const std::string& filename);
+  void SetFileMetadata(const std::string& filename, void* metadata);
 
   /// Gets scanner specific metadata for 'filename'.  Scanners can use this to store
   /// file header information.
@@ -259,117 +94,18 @@ class HdfsScanNode : public ScanNode {
   /// This is thread safe.
   void* GetFileMetadata(const std::string& filename);
 
-  /// Sets the scanner specific metadata for 'filename'.
-  /// This is thread safe.
-  void SetFileMetadata(const std::string& filename, void* metadata);
-
-  /// Called by the scanner when a range is complete.  Used to trigger done_ and
-  /// to log progress.  This *must* only be called after the scanner has completely
-  /// finished the scan range (i.e. context->Flush()), and has added the final
-  /// row batch to the row batch queue. Otherwise the last batch may be
-  /// lost due to racing with shutting down the row batch queue.
-  void RangeComplete(const THdfsFileFormat::type& file_type,
-      const THdfsCompression::type& compression_type);
-  /// Same as above except for when multiple compression codecs were used
-  /// in the file. The metrics are incremented for each compression_type.
-  void RangeComplete(const THdfsFileFormat::type& file_type,
+  /// Called by scanners when a range is complete. Used to record progress and set done_.
+  /// This *must* only be called after a scanner has completely finished its
+  /// scan range (i.e. context->Flush()), and has added the final row batch to the row
+  /// batch queue. Otherwise, we may lose the last batch due to racing with shutting down
+  /// the RowBatch queue.
+  virtual void RangeComplete(const THdfsFileFormat::type& file_type,
       const std::vector<THdfsCompression::type>& compression_type);
 
-  /// Utility function to compute the order in which to materialize slots to allow for
-  /// computing conjuncts as slots get materialized (on partial tuples).
-  /// 'order' will contain for each slot, the first conjunct it is associated with.
-  /// e.g. order[2] = 1 indicates materialized_slots[2] must be materialized before
-  /// evaluating conjuncts[1].  Slots that are not referenced by any conjuncts will have
-  /// order set to conjuncts.size()
-  void ComputeSlotMaterializationOrder(std::vector<int>* order) const;
-
-  /// Returns true if there are no materialized slots, such as a count(*) over the table.
-  inline bool IsZeroSlotTableScan() const {
-    return materialized_slots().empty() && tuple_desc()->tuple_path().empty();
-  }
-
-  /// map from volume id to <number of split, per volume split lengths>
-  typedef boost::unordered_map<int32_t, std::pair<int, int64_t>> PerVolumnStats;
-
-  /// Update the per volume stats with the given scan range params list
-  static void UpdateHdfsSplitStats(
-      const std::vector<TScanRangeParams>& scan_range_params_list,
-      PerVolumnStats* per_volume_stats);
-
-  /// Output the per_volume_stats to stringstream. The output format is a list of:
-  /// <volume id>:<# splits>/<per volume split lengths>
-  static void PrintHdfsSplitStats(const PerVolumnStats& per_volume_stats,
-      std::stringstream* ss);
-
-  /// Description string for the per volume stats output.
-  static const std::string HDFS_SPLIT_STATS_DESC;
-
-  /// Returns true if partition 'partition_id' passes all the filter predicates in
-  /// 'filter_ctxs' and should not be filtered out. 'stats_name' is the key of one of the
-  /// counter groups in FilterStats, and is used to update the correct statistics.
-  ///
-  /// 'filter_ctxs' is either an empty list, in which case filtering is disabled and the
-  /// function returns true, or a set of filter contexts to evaluate.
-  bool PartitionPassesFilters(int32_t partition_id, const std::string& stats_name,
-      const std::vector<FilterContext>& filter_ctxs);
-
-  const std::vector<FilterContext> filter_ctxs() const { return filter_ctxs_; }
-
- protected:
-  friend class ScannerContext;
-  friend class HdfsScanner;
-
-  RuntimeState* runtime_state_;
-
-  // Number of header lines to skip at the beginning of each file of this table. Only set
-  // to values > 0 for hdfs text files.
-  const int skip_header_line_count_;
-
-  /// Tuple id resolved in Prepare() to set tuple_desc_
-  const int tuple_id_;
-
-  /// RequestContext object to use with the disk-io-mgr for reads.
-  DiskIoRequestContext* reader_context_;
-
-  /// Descriptor for tuples this scan node constructs
-  const TupleDescriptor* tuple_desc_;
-
-  /// Map from partition ID to a template tuple (owned by scan_node_pool_) which has only
-  /// the partition columns for that partition materialized. Used to filter files and scan
-  /// ranges on partition-column filters. Populated in Prepare().
-  boost::unordered_map<int64_t, Tuple*> partition_template_tuple_map_;
-
-  /// Descriptor for the hdfs table, including partition and format metadata.
-  /// Set in Prepare, owned by RuntimeState
-  const HdfsTableDescriptor* hdfs_table_;
-
-  /// The root of the table's Avro schema, if we're scanning an Avro table.
-  ScopedAvroSchemaElement avro_schema_;
-
-  /// If true, the warning that some disk ids are unknown was logged.  Only log this once
-  /// per scan node since it can be noisy.
-  bool unknown_disk_id_warned_;
-
-  /// Partitions scanned by this scan node.
-  boost::unordered_set<int64_t> partition_ids_;
-
-  /// File path => file descriptor (which includes the file's splits)
-  typedef std::map<std::string, HdfsFileDesc*> FileDescMap;
-  FileDescMap file_descs_;
-
-  /// File format => file descriptors.
-  typedef std::map<THdfsFileFormat::type, std::vector<HdfsFileDesc*>> FileFormatsMap;
-  FileFormatsMap per_type_files_;
-
-  /// Conjuncts for each materialized tuple (top-level row batch tuples and collection
-  /// item tuples). Includes a copy of ExecNode.conjuncts_.
-  ConjunctsMap conjuncts_map_;
-
-  /// Set to true when the initial scan ranges are issued to the IoMgr. This happens on
-  /// the first call to GetNext(). The token manager, in a different thread, will read
-  /// this variable.
-  bool initial_ranges_issued_;
+  /// Acquires all allocations from pool into scan_node_pool_. Thread-safe.
+  void TransferToScanNodePool(MemPool* pool);
 
+ private:
   /// Released when initial ranges are issued in the first call to GetNext().
   CountingBarrier ranges_issued_barrier_;
 
@@ -378,44 +114,6 @@ class HdfsScanNode : public ScanNode {
   /// scanner threads.
   int64_t scanner_thread_bytes_required_;
 
-  /// Number of files that have not been issued from the scanners.
-  AtomicInt32 num_unqueued_files_;
-
-  /// Map of HdfsScanner objects to file types.  Only one scanner object will be
-  /// created for each file type.  Objects stored in runtime_state's pool.
-  typedef std::map<THdfsFileFormat::type, HdfsScanner*> ScannerMap;
-  ScannerMap scanner_map_;
-
-  /// Per scanner type codegen'd fn.
-  typedef std::map<THdfsFileFormat::type, void*> CodegendFnMap;
-  CodegendFnMap codegend_fn_map_;
-
-  /// Maps from a slot's path to its index into materialized_slots_.
-  typedef boost::unordered_map<std::vector<int>, int> PathToSlotIdxMap;
-  PathToSlotIdxMap path_to_materialized_slot_idx_;
-
-  /// List of contexts for expected runtime filters for this scan node. These contexts are
-  /// cloned by individual scanners to be used in multi-threaded contexts, passed through
-  /// the per-scanner ScannerContext..
-  std::vector<FilterContext> filter_ctxs_;
-
-  /// is_materialized_col_[i] = <true i-th column should be materialized, false otherwise>
-  /// for 0 <= i < total # columns in table
-  //
-  /// This should be a vector<bool>, but bool vectors are special-cased and not stored
-  /// internally as arrays, so instead we store as chars and cast to bools as needed
-  std::vector<char> is_materialized_col_;
-
-  /// Vector containing slot descriptors for all non-partition key slots.  These
-  /// descriptors are sorted in order of increasing col_pos.
-  std::vector<SlotDescriptor*> materialized_slots_;
-
-  /// Vector containing slot descriptors for all partition key slots.
-  std::vector<SlotDescriptor*> partition_key_slots_;
-
-  /// Keeps track of total splits and the number finished.
-  ProgressUpdater progress_;
-
   /// Scanner specific per file metadata (e.g. header information) and associated lock.
   /// This lock cannot be taken together with any other locks except lock_.
   boost::mutex metadata_lock_;
@@ -431,43 +129,15 @@ class HdfsScanNode : public ScanNode {
   /// Maximum size of materialized_row_batches_.
   int max_materialized_row_batches_;
 
-  /// This is the number of io buffers that are owned by the scan node and the scanners.
-  /// This is used just to help debug leaked io buffers to determine if the leak is
-  /// happening in the scanners vs other parts of the execution.
-  AtomicInt32 num_owned_io_buffers_;
-
-  /// Counters which track the number of scanners that have codegen enabled for the
-  /// materialize and conjuncts evaluation code paths.
-  AtomicInt32 num_scanners_codegen_enabled_;
-  AtomicInt32 num_scanners_codegen_disabled_;
-
-  /// The size of the largest compressed text file to be scanned. This is used to
-  /// estimate scanner thread memory usage.
-  RuntimeProfile::HighWaterMarkCounter* max_compressed_text_file_length_;
-
-  /// Disk accessed bitmap
-  RuntimeProfile::Counter disks_accessed_bitmap_;
-
-  /// Total number of bytes read locally
-  RuntimeProfile::Counter* bytes_read_local_;
-
-  /// Total number of bytes read via short circuit read
-  RuntimeProfile::Counter* bytes_read_short_circuit_;
-
-  /// Total number of bytes read from data node cache
-  RuntimeProfile::Counter* bytes_read_dn_cache_;
-
-  /// Total number of remote scan ranges
-  RuntimeProfile::Counter* num_remote_ranges_;
-
-  /// Total number of bytes read remotely that were expected to be local
-  RuntimeProfile::Counter* unexpected_remote_bytes_;
-
   /// Lock protects access between scanner thread and main query thread (the one calling
   /// GetNext()) for all fields below.  If this lock and any other locks needs to be taken
   /// together, this lock must be taken first.
   boost::mutex lock_;
 
+  /// Protects file_type_counts_. Cannot be taken together with any other lock
+  /// except lock_, and if so, lock_ must be taken first.
+  SpinLock file_type_counts_;
+
   /// Flag signaling that all scanner threads are done.  This could be because they
   /// are finished, an error/cancellation occurred, or the limit was reached.
   /// Setting this to true triggers the scanner threads to clean up.
@@ -478,27 +148,6 @@ class HdfsScanNode : public ScanNode {
   /// being processed by scanner threads, but no new ScannerThreads should be started.
   bool all_ranges_started_;
 
-  /// Pool for allocating some amounts of memory that is shared between scanners.
-  /// e.g. partition key tuple and their string buffers
-  boost::scoped_ptr<MemPool> scan_node_pool_;
-
-  /// Status of failed operations.  This is set in the ScannerThreads
-  /// Returned in GetNext() if an error occurred.  An non-ok status triggers cleanup
-  /// scanner threads.
-  Status status_;
-
-  /// Mapping of file formats (file type, compression type) to the number of
-  /// splits of that type and the lock protecting it.
-  /// This lock cannot be taken together with any other lock except lock_.
-  SpinLock file_type_counts_lock_;
-  typedef std::map<
-      std::pair<THdfsFileFormat::type, THdfsCompression::type>, int> FileTypeCountsMap;
-  FileTypeCountsMap file_type_counts_;
-
-  /// If true, counters are actively running and need to be reported in the runtime
-  /// profile.
-  bool counters_running_;
-
   /// The id of the callback added to the thread resource manager when thread token
   /// is available. Used to remove the callback before this scan node is destroyed.
   /// -1 if no callback is registered.
@@ -519,11 +168,6 @@ class HdfsScanNode : public ScanNode {
   /// (e.g., when adding new ranges) or when threads are available for this scan node.
   void ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool);
 
-  /// Create and open new scanner for this partition type.
-  /// If the scanner is successfully created, it is returned in 'scanner'.
-  Status CreateAndOpenScanner(HdfsPartitionDescriptor* partition,
-      ScannerContext* context, boost::scoped_ptr<HdfsScanner>* scanner);
-
   /// Main function for scanner thread. This thread pulls the next range to be
   /// processed from the IoMgr and then processes the entire range end to end.
   /// This thread terminates when all scan ranges are complete or an error occurred.
@@ -546,42 +190,9 @@ class HdfsScanNode : public ScanNode {
   /// Checks for eos conditions and returns batches from materialized_row_batches_.
   Status GetNextInternal(RuntimeState* state, RowBatch* row_batch, bool* eos);
 
-  /// sets done_ to true and triggers threads to cleanup. Cannot be calld with
+  /// sets done_ to true and triggers threads to cleanup. Cannot be called with
   /// any locks taken. Calling it repeatedly ignores subsequent calls.
   void SetDone();
-
-  /// Stops periodic counters and aggregates counter values for the entire scan node.
-  /// This should be called as soon as the scan node is complete to get the most accurate
-  /// counter values.
-  /// This can be called multiple times, subsequent calls will be ignored.
-  /// This must be called on Close() to unregister counters.
-  void StopAndFinalizeCounters();
-
-  /// Recursively initializes all NULL collection slots to an empty CollectionValue in
-  /// addition to maintaining the null bit. Hack to allow UnnestNode to project out
-  /// collection slots. Assumes that the null bit has already been un/set.
-  /// TODO: remove this function once the TODOs in UnnestNode regarding projection
-  /// have been addressed.
-  void InitNullCollectionValues(const TupleDescriptor* tuple_desc, Tuple* tuple) const;
-
-  /// Helper to call InitNullCollectionValues() on all tuples produced by this scan
-  /// in 'row_batch'.
-  void InitNullCollectionValues(RowBatch* row_batch) const;
-
-  /// Returns false if, according to filters in 'filter_ctxs', 'file' should be filtered
-  /// and therefore not processed. 'file_type' is the the format of 'file', and is used
-  /// for bookkeeping. Returns true if all filters pass or are not present.
-  bool FilePassesFilterPredicates(const std::vector<FilterContext>& filter_ctxs,
-      const THdfsFileFormat::type& file_type, HdfsFileDesc* file);
-
-  /// Waits for up to time_ms for runtime filters to arrive, checking every 20ms. Returns
-  /// true if all filters arrived within the time limit (as measured from the time of
-  /// RuntimeFilterBank::RegisterFilter()), false otherwise.
-  bool WaitForRuntimeFilters(int32_t time_ms);
-
-  /// Calls ExecDebugAction(). Returns the status based on the debug action specified
-  /// for the query.
-  Status TriggerDebugAction();
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/hdfs-scanner-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner-ir.cc b/be/src/exec/hdfs-scanner-ir.cc
index 2b053ca..e6da220 100644
--- a/be/src/exec/hdfs-scanner-ir.cc
+++ b/be/src/exec/hdfs-scanner-ir.cc
@@ -37,7 +37,7 @@ int HdfsScanner::WriteAlignedTuples(MemPool* pool, TupleRow* tuple_row, int row_
     FieldLocation* fields, int num_tuples, int max_added_tuples,
     int slots_per_tuple, int row_idx_start) {
 
-  DCHECK(add_batches_to_queue_);
+  DCHECK(scan_node_->HasRowBatchQueue());
   DCHECK(tuple_ != NULL);
   uint8_t* tuple_row_mem = reinterpret_cast<uint8_t*>(tuple_row);
   uint8_t* tuple_mem = reinterpret_cast<uint8_t*>(tuple_);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index ad3d834..841d1e4 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -26,6 +26,7 @@
 #include "common/object-pool.h"
 #include "exec/text-converter.h"
 #include "exec/hdfs-scan-node.h"
+#include "exec/hdfs-scan-node-mt.h"
 #include "exec/read-write-util.h"
 #include "exec/text-converter.inline.h"
 #include "exprs/expr-context.h"
@@ -56,20 +57,20 @@ using namespace strings;
 const char* FieldLocation::LLVM_CLASS_NAME = "struct.impala::FieldLocation";
 const char* HdfsScanner::LLVM_CLASS_NAME = "class.impala::HdfsScanner";
 
-HdfsScanner::HdfsScanner(HdfsScanNode* scan_node, RuntimeState* state,
-    bool add_batches_to_queue)
+HdfsScanner::HdfsScanner(HdfsScanNodeBase* scan_node, RuntimeState* state)
     : scan_node_(scan_node),
       state_(state),
-      add_batches_to_queue_(add_batches_to_queue),
       context_(NULL),
       stream_(NULL),
+      eos_(false),
       scanner_conjunct_ctxs_(NULL),
+      template_tuple_pool_(new MemPool(scan_node->mem_tracker())),
       template_tuple_(NULL),
       tuple_byte_size_(scan_node->tuple_desc()->byte_size()),
+      num_null_bytes_(scan_node->tuple_desc()->num_null_bytes()),
       tuple_(NULL),
       batch_(NULL),
       tuple_mem_(NULL),
-      num_null_bytes_(scan_node->tuple_desc()->num_null_bytes()),
       parse_status_(Status::OK()),
       decompression_type_(THdfsCompression::NONE),
       data_buffer_pool_(new MemPool(scan_node->mem_tracker())),
@@ -80,16 +81,17 @@ HdfsScanner::HdfsScanner(HdfsScanNode* scan_node, RuntimeState* state,
 HdfsScanner::HdfsScanner()
     : scan_node_(NULL),
       state_(NULL),
-      add_batches_to_queue_(true),
       context_(NULL),
       stream_(NULL),
+      eos_(false),
       scanner_conjunct_ctxs_(NULL),
+      template_tuple_pool_(NULL),
       template_tuple_(NULL),
       tuple_byte_size_(-1),
+      num_null_bytes_(-1),
       tuple_(NULL),
       batch_(NULL),
       tuple_mem_(NULL),
-      num_null_bytes_(-1),
       parse_status_(Status::OK()),
       decompression_type_(THdfsCompression::NONE),
       data_buffer_pool_(NULL),
@@ -107,19 +109,23 @@ Status HdfsScanner::Open(ScannerContext* context) {
 
   // Clone the scan node's conjuncts map. The cloned contexts must be closed by the
   // caller.
-  HdfsScanNode::ConjunctsMap::const_iterator iter = scan_node_->conjuncts_map().begin();
-  for (; iter != scan_node_->conjuncts_map().end(); ++iter) {
-    RETURN_IF_ERROR(Expr::CloneIfNotExists(iter->second,
-        scan_node_->runtime_state(), &scanner_conjuncts_map_[iter->first]));
+  for (const auto& entry: scan_node_->conjuncts_map()) {
+    RETURN_IF_ERROR(Expr::CloneIfNotExists(entry.second,
+        scan_node_->runtime_state(), &scanner_conjuncts_map_[entry.first]));
   }
   DCHECK(scanner_conjuncts_map_.find(scan_node_->tuple_desc()->id()) !=
          scanner_conjuncts_map_.end());
   scanner_conjunct_ctxs_ = &scanner_conjuncts_map_[scan_node_->tuple_desc()->id()];
 
   // Initialize the template_tuple_.
-  template_tuple_ = scan_node_->InitTemplateTuple(
-      state_, context_->partition_descriptor()->partition_key_value_ctxs());
+  vector<ExprContext*> partition_key_value_ctxs;
+  RETURN_IF_ERROR(Expr::CloneIfNotExists(
+      context_->partition_descriptor()->partition_key_value_ctxs(), state_,
+      &partition_key_value_ctxs));
+  template_tuple_ = scan_node_->InitTemplateTuple(partition_key_value_ctxs,
+      template_tuple_pool_.get(), state_);
   template_tuple_map_[scan_node_->tuple_desc()] = template_tuple_;
+  Expr::Close(partition_key_value_ctxs, state_);
 
   decompress_timer_ = ADD_TIMER(scan_node_->runtime_profile(), "DecompressionTime");
   return Status::OK();
@@ -127,10 +133,7 @@ Status HdfsScanner::Open(ScannerContext* context) {
 
 void HdfsScanner::Close(RowBatch* row_batch) {
   if (decompressor_.get() != NULL) decompressor_->Close();
-  HdfsScanNode::ConjunctsMap::const_iterator iter = scanner_conjuncts_map_.begin();
-  for (; iter != scanner_conjuncts_map_.end(); ++iter) {
-    Expr::Close(iter->second, state_);
-  }
+  for (const auto& entry: scanner_conjuncts_map_) Expr::Close(entry.second, state_);
   obj_pool_.Clear();
   stream_ = NULL;
   context_->ClearStreams();
@@ -158,7 +161,7 @@ Status HdfsScanner::InitializeWriteTuplesFn(HdfsPartitionDescriptor* partition,
 }
 
 Status HdfsScanner::StartNewRowBatch() {
-  DCHECK(add_batches_to_queue_);
+  DCHECK(scan_node_->HasRowBatchQueue());
   batch_ = new RowBatch(scan_node_->row_desc(), state_->batch_size(),
       scan_node_->mem_tracker());
   int64_t tuple_buffer_size;
@@ -168,7 +171,7 @@ Status HdfsScanner::StartNewRowBatch() {
 }
 
 int HdfsScanner::GetMemory(MemPool** pool, Tuple** tuple_mem, TupleRow** tuple_row_mem) {
-  DCHECK(add_batches_to_queue_);
+  DCHECK(scan_node_->HasRowBatchQueue());
   DCHECK(batch_ != NULL);
   DCHECK_GT(batch_->capacity(), batch_->num_rows());
   *pool = batch_->tuple_data_pool();
@@ -190,7 +193,7 @@ Status HdfsScanner::GetCollectionMemory(CollectionValueBuilder* builder, MemPool
 
 // TODO(skye): have this check scan_node_->ReachedLimit() and get rid of manual check?
 Status HdfsScanner::CommitRows(int num_rows) {
-  DCHECK(add_batches_to_queue_);
+  DCHECK(scan_node_->HasRowBatchQueue());
   DCHECK(batch_ != NULL);
   DCHECK_LE(num_rows, batch_->capacity() - batch_->num_rows());
   batch_->CommitRows(num_rows);
@@ -201,16 +204,15 @@ Status HdfsScanner::CommitRows(int num_rows) {
   // if no rows passed predicates.
   if (batch_->AtCapacity() || context_->num_completed_io_buffers() > 0) {
     context_->ReleaseCompletedResources(batch_, /* done */ false);
-    scan_node_->AddMaterializedRowBatch(batch_);
+    static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch(batch_);
     RETURN_IF_ERROR(StartNewRowBatch());
   }
   if (context_->cancelled()) return Status::CANCELLED;
   // Check for UDF errors.
   RETURN_IF_ERROR(state_->GetQueryStatus());
   // Free local expr allocations for this thread
-  HdfsScanNode::ConjunctsMap::const_iterator iter = scanner_conjuncts_map_.begin();
-  for (; iter != scanner_conjuncts_map_.end(); ++iter) {
-    ExprContext::FreeLocalAllocations(iter->second);
+  for (const auto& entry: scanner_conjuncts_map_) {
+    ExprContext::FreeLocalAllocations(entry.second);
   }
   return Status::OK();
 }
@@ -357,8 +359,9 @@ bool HdfsScanner::WriteCompleteTuple(MemPool* pool, FieldLocation* fields,
 // eval_fail:                                        ; preds = %parse
 //   ret i1 false
 // }
-Status HdfsScanner::CodegenWriteCompleteTuple(HdfsScanNode* node, LlvmCodeGen* codegen,
-    const vector<ExprContext*>& conjunct_ctxs, Function** write_complete_tuple_fn) {
+Status HdfsScanner::CodegenWriteCompleteTuple(HdfsScanNodeBase* node,
+    LlvmCodeGen* codegen, const vector<ExprContext*>& conjunct_ctxs,
+    Function** write_complete_tuple_fn) {
   *write_complete_tuple_fn = NULL;
   SCOPED_TIMER(codegen->codegen_timer());
   RuntimeState* state = node->runtime_state();
@@ -565,8 +568,9 @@ Status HdfsScanner::CodegenWriteCompleteTuple(HdfsScanNode* node, LlvmCodeGen* c
   return Status::OK();
 }
 
-Status HdfsScanner::CodegenWriteAlignedTuples(HdfsScanNode* node, LlvmCodeGen* codegen,
-    Function* write_complete_tuple_fn, Function** write_aligned_tuples_fn) {
+Status HdfsScanner::CodegenWriteAlignedTuples(HdfsScanNodeBase* node,
+    LlvmCodeGen* codegen, Function* write_complete_tuple_fn,
+    Function** write_aligned_tuples_fn) {
   *write_aligned_tuples_fn = NULL;
   SCOPED_TIMER(codegen->codegen_timer());
   DCHECK(write_complete_tuple_fn != NULL);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/hdfs-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index 6245e5f..d9fa424 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -27,7 +27,7 @@
 
 #include "codegen/impala-ir.h"
 #include "common/object-pool.h"
-#include "exec/hdfs-scan-node.h"
+#include "exec/hdfs-scan-node-base.h"
 #include "exec/scan-node.h"
 #include "exec/scanner-context.h"
 #include "runtime/disk-io-mgr.h"
@@ -75,8 +75,10 @@ struct FieldLocation {
 /// 1. Open()
 /// 2. ProcessSplit() or GetNext()*
 /// 3. Close()
-/// The scanner can be used in either of two modes, indicated via the add_batches_to_queue
-/// c'tor parameter.
+/// The scanner can be used in either of two modes. Which mode is expected to be used
+/// depends on the type of parent scan node. Parent scan nodes with a row batch queue
+/// are expected to call ProcessSplit() and not GetNext(). Row batches will be added to
+/// the scan node's row batch queue, including the final one in Close().
 /// ProcessSplit() scans the split and adds materialized row batches to the scan node's
 /// row batch queue until the split is complete or an error occurred.
 /// GetNext() provides an iterator-like interface where the caller can request
@@ -103,16 +105,15 @@ struct FieldLocation {
 /// resources (IO buffers and mem pools) to the current row batch, and passing row batches
 /// up to the scan node. Subclasses can also use GetMemory() to help with per-row memory
 /// management.
+/// TODO: Have a pass over all members and move them out of the base class if sensible
+/// to clarify which state each concrete scanner type actually has.
 class HdfsScanner {
  public:
   /// Assumed size of an OS file block.  Used mostly when reading file format headers, etc.
   /// This probably ought to be a derived number from the environment.
   const static int FILE_BLOCK_SIZE = 4096;
 
-  /// If 'add_batches_to_queue' is true the caller must call ProcessSplit() and not
-  /// GetNext(). Row batches will be added to the scan node's row batch queue, including
-  /// the final one in Close().
-  HdfsScanner(HdfsScanNode* scan_node, RuntimeState* state, bool add_batches_to_queue);
+  HdfsScanner(HdfsScanNodeBase* scan_node, RuntimeState* state);
 
   virtual ~HdfsScanner();
 
@@ -125,16 +126,16 @@ class HdfsScanner {
   /// returned, 'parse_status_' is guaranteed to be OK as well.
   /// The memory referenced by the tuples is valid until this or any subsequently
   /// returned batch is reset or destroyed.
-  /// Only valid to call if 'add_batches_to_queue_' is false.
-  Status GetNext(RowBatch* row_batch, bool* eos) {
-    DCHECK(!add_batches_to_queue_);
-    return GetNextInternal(row_batch, eos);
+  /// Only valid to call if the parent scan node is single-threaded.
+  Status GetNext(RowBatch* row_batch) {
+    DCHECK(!scan_node_->HasRowBatchQueue());
+    return GetNextInternal(row_batch);
   }
 
   /// Process an entire split, reading bytes from the context's streams.  Context is
   /// initialized with the split data (e.g. template tuple, partition descriptor, etc).
   /// This function should only return on error or end of scan range.
-  /// Only valid to call if 'add_batches_to_queue_' is true.
+  /// Only valid to call if the parent scan node is multi-threaded.
   virtual Status ProcessSplit() = 0;
 
   /// Transfers the ownership of memory backing returned tuples such as IO buffers
@@ -143,9 +144,15 @@ class HdfsScanner {
   /// that are not backing returned rows (e.g. temporary decompression buffers).
   virtual void Close(RowBatch* row_batch);
 
-  /// Only valid to call if 'add_batches_to_queue_' is true.
+  /// Only valid to call if the parent scan node is single-threaded.
+  bool eos() const {
+    DCHECK(!scan_node_->HasRowBatchQueue());
+    return eos_;
+  }
+
+  /// Only valid to call if the parent scan node is multi-threaded.
   RowBatch* batch() const {
-    DCHECK(add_batches_to_queue_);
+    DCHECK(scan_node_->HasRowBatchQueue());
     return batch_;
   }
 
@@ -177,16 +184,11 @@ class HdfsScanner {
 
  protected:
   /// The scan node that started this scanner
-  HdfsScanNode* scan_node_;
+  HdfsScanNodeBase* scan_node_;
 
   /// RuntimeState for error reporting
   RuntimeState* state_;
 
-  /// True if the creator of this scanner intends to use ProcessSplit() and not GetNext).
-  /// Row batches will be added to the scan node's row batch queue, including the final
-  /// one in Close().
-  const bool add_batches_to_queue_;
-
   /// Context for this scanner
   ScannerContext* context_;
 
@@ -196,14 +198,24 @@ class HdfsScanner {
   /// The first stream for context_
   ScannerContext::Stream* stream_;
 
+  /// Set if this scanner has processed all ranges and will not produce more rows.
+  /// Only relevant when calling the GetNext() interface.
+  bool eos_;
+
   /// Clones of the conjuncts ExprContexts in scan_node_->conjuncts_map(). Each scanner
   /// has its own ExprContexts so the conjuncts can be safely evaluated in parallel.
-  HdfsScanNode::ConjunctsMap scanner_conjuncts_map_;
+  HdfsScanNodeBase::ConjunctsMap scanner_conjuncts_map_;
 
   // Convenience reference to scanner_conjuncts_map_[scan_node_->tuple_idx()] for scanners
   // that do not support nested types.
   const std::vector<ExprContext*>* scanner_conjunct_ctxs_;
 
+  /// Holds memory for template tuples. The memory in this pool must remain valid as long
+  /// as the row batches produced by this scanner. This typically means that the
+  /// ownership is transferred to the last row batch in Close(). Some scanners transfer
+  /// the ownership to the parent scan node instead due being closed multiple times.
+  boost::scoped_ptr<MemPool> template_tuple_pool_;
+
   /// A template tuple is a partially-materialized tuple with only partition key slots set
   /// (or other default values, such as NULL for columns missing in a file).  The other
   /// slots are set to NULL.  The template tuple must be copied into output tuples before
@@ -211,9 +223,8 @@ class HdfsScanner {
   ///
   /// Each tuple descriptor (i.e. scan_node_->tuple_desc() and any collection item tuple
   /// descs) has a template tuple, or NULL if there are no partition key or default slots.
-  /// Template tuples are computed once for each file and valid for the duration of that
-  /// file.  They are owned by the HDFS scan node, although each scanner has its own
-  /// template tuples.
+  /// Template tuples are computed once for each file and are allocated from
+  /// template_tuple_pool_.
   std::map<const TupleDescriptor*, Tuple*> template_tuple_map_;
 
   /// Convenience variable set to the top-level template tuple
@@ -221,7 +232,10 @@ class HdfsScanner {
   Tuple* template_tuple_;
 
   /// Fixed size of each top-level tuple, in bytes
-  int tuple_byte_size_;
+  const int32_t tuple_byte_size_;
+
+  /// Number of null bytes in the top-level tuple.
+  const int32_t num_null_bytes_;
 
   /// Current tuple pointer into tuple_mem_.
   Tuple* tuple_;
@@ -239,9 +253,6 @@ class HdfsScanner {
   /// Helper class for converting text to other types;
   boost::scoped_ptr<TextConverter> text_converter_;
 
-  /// Number of null bytes in the top-level tuple.
-  int32_t num_null_bytes_;
-
   /// Contains current parse status to minimize the number of Status objects returned.
   /// This significantly minimizes the cross compile dependencies for llvm since status
   /// objects inline a bunch of string functions.  Also, status objects aren't extremely
@@ -269,8 +280,8 @@ class HdfsScanner {
   WriteTuplesFn write_tuples_fn_;
 
   /// Implements GetNext(). Should be overridden by subclasses.
-  /// May be called even if 'add_batches_to_queue_' is true.
-  virtual Status GetNextInternal(RowBatch* row_batch, bool* eos) {
+  /// Only valid to call if the parent scan node is multi-threaded.
+  virtual Status GetNextInternal(RowBatch* row_batch) {
     DCHECK(false) << "GetNextInternal() not implemented for this scanner type.";
     return Status::OK();
   }
@@ -283,7 +294,7 @@ class HdfsScanner {
       THdfsFileFormat::type type, const std::string& scanner_name);
 
   /// Set 'batch_' to a new row batch and update 'tuple_mem_' accordingly.
-  /// Only valid to call if 'add_batches_to_queue_' is true.
+  /// Only valid to call if the parent scan node is multi-threaded.
   Status StartNewRowBatch();
 
   /// Reset internal state for a new scan range.
@@ -297,7 +308,7 @@ class HdfsScanner {
   /// current row batch is complete and a new one is allocated).
   /// Memory returned from this call is invalidated after calling CommitRows().
   /// Callers must call GetMemory() again after calling this function.
-  /// Only valid to call if 'add_batches_to_queue_' is true.
+  /// Only valid to call if the parent scan node is multi-threaded.
   int GetMemory(MemPool** pool, Tuple** tuple_mem, TupleRow** tuple_row_mem);
 
   /// Gets memory for outputting tuples into the CollectionValue being constructed via
@@ -316,15 +327,15 @@ class HdfsScanner {
   /// Returns Status::OK if the query is not cancelled and hasn't exceeded any mem limits.
   /// Scanner can call this with 0 rows to flush any pending resources (attached pools
   /// and io buffers) to minimize memory consumption.
-  /// Only valid to call if 'add_batches_to_queue_' is true.
+  /// Only valid to call if the parent scan node is multi-threaded.
   Status CommitRows(int num_rows);
 
   /// Release all memory in 'pool' to batch_. If commit_batch is true, the row batch
   /// will be committed. commit_batch should be true if the attached pool is expected
   /// to be non-trivial (i.e. a decompression buffer) to minimize scanner mem usage.
-  /// Only valid to call if 'add_batches_to_queue_' is true.
+  /// Only valid to call if the parent scan node is multi-threaded.
   void AttachPool(MemPool* pool, bool commit_batch) {
-    DCHECK(add_batches_to_queue_);
+    DCHECK(scan_node_->HasRowBatchQueue());
     DCHECK(batch_ != NULL);
     DCHECK(pool != NULL);
     batch_->tuple_data_pool()->AcquireData(pool, false);
@@ -359,7 +370,7 @@ class HdfsScanner {
   /// Returns the number of tuples added to the row batch.  This can be less than
   /// num_tuples/tuples_till_limit because of failed conjuncts.
   /// Returns -1 if parsing should be aborted due to parse errors.
-  /// Only valid to call if 'add_batches_to_queue_' is true.
+  /// Only valid to call if the parent scan node is multi-threaded.
   int WriteAlignedTuples(MemPool* pool, TupleRow* tuple_row_mem, int row_size,
       FieldLocation* fields, int num_tuples,
       int max_added_tuples, int slots_per_tuple, int row_start_indx);
@@ -410,7 +421,7 @@ class HdfsScanner {
   /// Codegen function to replace WriteCompleteTuple. Should behave identically
   /// to WriteCompleteTuple. Stores the resulting function in 'write_complete_tuple_fn'
   /// if codegen was successful or NULL otherwise.
-  static Status CodegenWriteCompleteTuple(HdfsScanNode*, LlvmCodeGen*,
+  static Status CodegenWriteCompleteTuple(HdfsScanNodeBase* node, LlvmCodeGen* codegen,
       const std::vector<ExprContext*>& conjunct_ctxs,
       llvm::Function** write_complete_tuple_fn);
 
@@ -418,7 +429,7 @@ class HdfsScanner {
   /// compiled to IR.  This function loads the precompiled IR function, modifies it,
   /// and stores the resulting function in 'write_aligned_tuples_fn' if codegen was
   /// successful or NULL otherwise.
-  static Status CodegenWriteAlignedTuples(HdfsScanNode*, LlvmCodeGen*,
+  static Status CodegenWriteAlignedTuples(HdfsScanNodeBase*, LlvmCodeGen*,
       llvm::Function* write_tuple_fn, llvm::Function** write_aligned_tuples_fn);
 
   /// Report parse error for column @ desc.   If abort_on_error is true, sets

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/hdfs-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-scanner.cc b/be/src/exec/hdfs-sequence-scanner.cc
index ec52901..03a07ce 100644
--- a/be/src/exec/hdfs-sequence-scanner.cc
+++ b/be/src/exec/hdfs-sequence-scanner.cc
@@ -41,9 +41,9 @@ const uint8_t HdfsSequenceScanner::SEQFILE_VERSION_HEADER[4] = {'S', 'E', 'Q', 6
 
 #define RETURN_IF_FALSE(x) if (UNLIKELY(!(x))) return parse_status_
 
-HdfsSequenceScanner::HdfsSequenceScanner(HdfsScanNode* scan_node, RuntimeState* state,
-    bool add_batches_to_queue)
-    : BaseSequenceScanner(scan_node, state, add_batches_to_queue),
+HdfsSequenceScanner::HdfsSequenceScanner(HdfsScanNodeBase* scan_node,
+    RuntimeState* state)
+    : BaseSequenceScanner(scan_node, state),
       unparsed_data_buffer_(NULL),
       num_buffered_records_in_compressed_block_(0) {
 }
@@ -52,7 +52,7 @@ HdfsSequenceScanner::~HdfsSequenceScanner() {
 }
 
 // Codegen for materialized parsed data into tuples.
-Status HdfsSequenceScanner::Codegen(HdfsScanNode* node,
+Status HdfsSequenceScanner::Codegen(HdfsScanNodeBase* node,
     const vector<ExprContext*>& conjunct_ctxs, Function** write_aligned_tuples_fn) {
   *write_aligned_tuples_fn = NULL;
   if (!node->runtime_state()->codegen_enabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/hdfs-sequence-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-scanner.h b/be/src/exec/hdfs-sequence-scanner.h
index e2dcee9..5d6074f 100644
--- a/be/src/exec/hdfs-sequence-scanner.h
+++ b/be/src/exec/hdfs-sequence-scanner.h
@@ -162,8 +162,7 @@ class HdfsSequenceScanner : public BaseSequenceScanner {
   /// SeqFile file: {'S', 'E', 'Q', 6}
   static const uint8_t SEQFILE_VERSION_HEADER[4];
 
-  HdfsSequenceScanner(HdfsScanNode* scan_node, RuntimeState* state,
-      bool add_batches_to_queue);
+  HdfsSequenceScanner(HdfsScanNodeBase* scan_node, RuntimeState* state);
 
   virtual ~HdfsSequenceScanner();
 
@@ -172,7 +171,8 @@ class HdfsSequenceScanner : public BaseSequenceScanner {
 
   /// Codegen WriteAlignedTuples(). Stores the resulting function in
   /// 'write_aligned_tuples_fn' if codegen was successful or NULL otherwise.
-  static Status Codegen(HdfsScanNode*, const std::vector<ExprContext*>& conjunct_ctxs,
+  static Status Codegen(HdfsScanNodeBase* node,
+      const std::vector<ExprContext*>& conjunct_ctxs,
       llvm::Function** write_aligned_tuples_fn);
 
  protected:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index f1c80b0..f3f0a7c 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -50,9 +50,8 @@ const string HdfsTextScanner::LZO_INDEX_SUFFIX = ".index";
 // progress.
 const int64_t COMPRESSED_DATA_FIXED_READ_SIZE = 1 * 1024 * 1024;
 
-HdfsTextScanner::HdfsTextScanner(HdfsScanNode* scan_node, RuntimeState* state,
-    bool add_batches_to_queue)
-    : HdfsScanner(scan_node, state, add_batches_to_queue),
+HdfsTextScanner::HdfsTextScanner(HdfsScanNodeBase* scan_node, RuntimeState* state)
+    : HdfsScanner(scan_node, state),
       byte_buffer_ptr_(NULL),
       byte_buffer_end_(NULL),
       byte_buffer_read_size_(0),
@@ -67,7 +66,7 @@ HdfsTextScanner::HdfsTextScanner(HdfsScanNode* scan_node, RuntimeState* state,
 HdfsTextScanner::~HdfsTextScanner() {
 }
 
-Status HdfsTextScanner::IssueInitialRanges(HdfsScanNode* scan_node,
+Status HdfsTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
     const vector<HdfsFileDesc*>& files) {
   vector<DiskIoMgr::ScanRange*> compressed_text_scan_ranges;
   int compressed_text_files = 0;
@@ -108,7 +107,7 @@ Status HdfsTextScanner::IssueInitialRanges(HdfsScanNode* scan_node,
           // Populate the list of compressed text scan ranges.
           DCHECK_GT(files[i]->file_length, 0);
           ScanRangeMetadata* metadata =
-              reinterpret_cast<ScanRangeMetadata*>(split->meta_data());
+              static_cast<ScanRangeMetadata*>(split->meta_data());
           DiskIoMgr::ScanRange* file_range = scan_node->AllocateScanRange(
               files[i]->fs, files[i]->filename.c_str(), files[i]->file_length, 0,
               metadata->partition_id, split->disk_id(), split->try_cache(),
@@ -149,7 +148,7 @@ Status HdfsTextScanner::IssueInitialRanges(HdfsScanNode* scan_node,
 }
 
 Status HdfsTextScanner::ProcessSplit() {
-  DCHECK(add_batches_to_queue_);
+  DCHECK(scan_node_->HasRowBatchQueue());
 
   // Reset state for new scan range
   RETURN_IF_ERROR(InitNewRange());
@@ -187,12 +186,19 @@ void HdfsTextScanner::Close(RowBatch* row_batch) {
     decompressor_.reset(NULL);
   }
   if (row_batch != NULL) {
+    row_batch->tuple_data_pool()->AcquireData(template_tuple_pool_.get(), false);
     row_batch->tuple_data_pool()->AcquireData(data_buffer_pool_.get(), false);
     row_batch->tuple_data_pool()->AcquireData(boundary_pool_.get(), false);
     context_->ReleaseCompletedResources(row_batch, true);
-    if (add_batches_to_queue_) scan_node_->AddMaterializedRowBatch(row_batch);
+    if (scan_node_->HasRowBatchQueue()) {
+      static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch(row_batch);
+    }
+  } else {
+    if (template_tuple_pool_.get() != NULL) template_tuple_pool_->FreeAll();
   }
+
   // Verify all resources (if any) have been transferred.
+  DCHECK_EQ(template_tuple_pool_.get()->total_allocated_bytes(), 0);
   DCHECK_EQ(data_buffer_pool_.get()->total_allocated_bytes(), 0);
   DCHECK_EQ(boundary_pool_.get()->total_allocated_bytes(), 0);
   DCHECK_EQ(context_->num_completed_io_buffers(), 0);
@@ -687,7 +693,7 @@ Status HdfsTextScanner::CheckForSplitDelimiter(bool* split_delimiter) {
 // Codegen for materializing parsed data into tuples.  The function WriteCompleteTuple is
 // codegen'd using the IRBuilder for the specific tuple description.  This function
 // is then injected into the cross-compiled driving function, WriteAlignedTuples().
-Status HdfsTextScanner::Codegen(HdfsScanNode* node,
+Status HdfsTextScanner::Codegen(HdfsScanNodeBase* node,
     const vector<ExprContext*>& conjunct_ctxs, Function** write_aligned_tuples_fn) {
   *write_aligned_tuples_fn = NULL;
   if (!node->runtime_state()->codegen_enabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/hdfs-text-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.h b/be/src/exec/hdfs-text-scanner.h
index 937626f..e68d45f 100644
--- a/be/src/exec/hdfs-text-scanner.h
+++ b/be/src/exec/hdfs-text-scanner.h
@@ -47,8 +47,7 @@ struct HdfsFileDesc;
 /// scanner for the tuple directly after it.
 class HdfsTextScanner : public HdfsScanner {
  public:
-  HdfsTextScanner(HdfsScanNode* scan_node, RuntimeState* state,
-      bool add_batches_to_queue);
+  HdfsTextScanner(HdfsScanNodeBase* scan_node, RuntimeState* state);
   virtual ~HdfsTextScanner();
 
   /// Implementation of HdfsScanner interface.
@@ -57,12 +56,13 @@ class HdfsTextScanner : public HdfsScanner {
   virtual void Close(RowBatch* row_batch);
 
   /// Issue io manager byte ranges for 'files'.
-  static Status IssueInitialRanges(HdfsScanNode* scan_node,
+  static Status IssueInitialRanges(HdfsScanNodeBase* scan_node,
                                    const std::vector<HdfsFileDesc*>& files);
 
   /// Codegen WriteAlignedTuples(). Stores the resulting function in
   /// 'write_aligned_tuples_fn' if codegen was successful or NULL otherwise.
-  static Status Codegen(HdfsScanNode*, const std::vector<ExprContext*>& conjunct_ctxs,
+  static Status Codegen(HdfsScanNodeBase* node,
+      const std::vector<ExprContext*>& conjunct_ctxs,
       llvm::Function** write_aligned_tuples_fn);
 
   /// Suffix for lzo index files.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/scanner-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index 69d682e..033ab82 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -19,6 +19,7 @@
 
 #include <gutil/strings/substitute.h>
 
+#include "exec/hdfs-scan-node-base.h"
 #include "exec/hdfs-scan-node.h"
 #include "runtime/row-batch.h"
 #include "runtime/mem-pool.h"
@@ -40,7 +41,7 @@ static const int64_t INIT_READ_PAST_SIZE_BYTES = 64 * 1024;
 // output_buffer_bytes_left_ will be set to something else.
 static const int64_t OUTPUT_BUFFER_BYTES_LEFT_INIT = 0;
 
-ScannerContext::ScannerContext(RuntimeState* state, HdfsScanNode* scan_node,
+ScannerContext::ScannerContext(RuntimeState* state, HdfsScanNodeBase* scan_node,
     HdfsPartitionDescriptor* partition_desc, DiskIoMgr::ScanRange* scan_range,
     const vector<FilterContext>& filter_ctxs)
   : state_(state),
@@ -311,7 +312,8 @@ Status ScannerContext::Stream::GetBytesInternal(int64_t requested_len,
 }
 
 bool ScannerContext::cancelled() const {
-  return scan_node_->done_;
+  if (!scan_node_->HasRowBatchQueue()) return false;
+  return static_cast<HdfsScanNode*>(scan_node_)->done();
 }
 
 Status ScannerContext::Stream::ReportIncompleteRead(int64_t length, int64_t bytes_read) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/scanner-context.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h
index 9178324..1f1bc0d 100644
--- a/be/src/exec/scanner-context.h
+++ b/be/src/exec/scanner-context.h
@@ -31,7 +31,7 @@ namespace impala {
 
 struct HdfsFileDesc;
 class HdfsPartitionDescriptor;
-class HdfsScanNode;
+class HdfsScanNodeBase;
 class MemPool;
 class RowBatch;
 class RuntimeState;
@@ -55,12 +55,14 @@ class TupleRow;
 ///      from processing the bytes. This is the consumer.
 ///   3. The scan node/main thread which calls into the context to trigger cancellation
 ///      or other end of stream conditions.
+/// TODO: Some of the synchronization mechanisms such as cancelled() can be removed
+/// once the legacy hdfs scan node has been removed.
 class ScannerContext {
  public:
   /// Create a scanner context with the parent scan_node (where materialized row batches
   /// get pushed to) and the scan range to process.
   /// This context starts with 1 stream.
-  ScannerContext(RuntimeState*, HdfsScanNode*, HdfsPartitionDescriptor*,
+  ScannerContext(RuntimeState*, HdfsScanNodeBase*, HdfsPartitionDescriptor*,
       DiskIoMgr::ScanRange* scan_range, const std::vector<FilterContext>& filter_ctxs);
 
   /// Destructor verifies that all stream objects have been released.
@@ -295,7 +297,8 @@ class ScannerContext {
   /// The stream is created in the runtime state's object pool
   Stream* AddStream(DiskIoMgr::ScanRange* range);
 
-  /// If true, the ScanNode has been cancelled and the scanner thread should finish up
+  /// Returns false it scan_node_ is multi-threaded and has been cancelled.
+  /// Always returns false if the scan_node_ is not multi-threaded.
   bool cancelled() const;
 
   int num_completed_io_buffers() const { return num_completed_io_buffers_; }
@@ -306,7 +309,7 @@ class ScannerContext {
   friend class Stream;
 
   RuntimeState* state_;
-  HdfsScanNode* scan_node_;
+  HdfsScanNodeBase* scan_node_;
 
   HdfsPartitionDescriptor* partition_desc_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exprs/expr-context.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr-context.h b/be/src/exprs/expr-context.h
index 3c5d5af..09de6a5 100644
--- a/be/src/exprs/expr-context.h
+++ b/be/src/exprs/expr-context.h
@@ -110,8 +110,9 @@ class ExprContext {
     return fn_contexts_[i];
   }
 
-  Expr* root() { return root_; }
-  bool closed() { return closed_; }
+  Expr* root() const { return root_; }
+  bool closed() const { return closed_; }
+  bool is_clone() const { return is_clone_; }
 
   /// Calls Get*Val on root_
   BooleanVal GetBooleanVal(TupleRow* row);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/runtime/tuple.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/tuple.h b/be/src/runtime/tuple.h
index 10811ea..3a3e399 100644
--- a/be/src/runtime/tuple.h
+++ b/be/src/runtime/tuple.h
@@ -19,7 +19,6 @@
 #ifndef IMPALA_RUNTIME_TUPLE_H
 #define IMPALA_RUNTIME_TUPLE_H
 
-#include <cstring>
 #include "codegen/impala-ir.h"
 #include "common/logging.h"
 #include "gutil/macros.h"
@@ -70,9 +69,7 @@ class Tuple {
     return result;
   }
 
-  void Init(int size) {
-    bzero(this, size);
-  }
+  void Init(int size) { memset(this, 0, size); }
 
   /// The total size of all data represented in this tuple (tuple data and referenced
   /// string and collection data).

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index f4b070f..bf03d98 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -183,10 +183,9 @@ struct TQueryOptions {
   43: optional TParquetFallbackSchemaResolution parquet_fallback_schema_resolution = 0
 
   // Multi-threaded execution: number of cores per query per node.
-  // > 1: multi-threaded execution mode, with given number of cores
-  // 1: single-threaded execution mode
-  // 0: multi-threaded execution mode, number of cores is the pool default
-  44: optional i32 mt_num_cores = 1
+  // > 0: multi-threaded execution mode, with given number of cores
+  // 0: single-threaded execution mode
+  44: optional i32 mt_num_cores = 0
 
   // If true, INSERT writes to S3 go directly to their final location rather than being
   // copied there by the coordinator. We cannot do this for INSERT OVERWRITES because for

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/fe/src/main/java/com/cloudera/impala/service/Frontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/service/Frontend.java b/fe/src/main/java/com/cloudera/impala/service/Frontend.java
index 83af818..2d38396 100644
--- a/fe/src/main/java/com/cloudera/impala/service/Frontend.java
+++ b/fe/src/main/java/com/cloudera/impala/service/Frontend.java
@@ -956,7 +956,7 @@ public class Frontend {
     LOG.debug("create plan");
     Planner planner = new Planner(analysisResult, queryCtx);
     if (RuntimeEnv.INSTANCE.isTestEnv()
-        && queryCtx.request.query_options.mt_num_cores != 1) {
+        && queryCtx.request.query_options.mt_num_cores > 0) {
       // TODO: this is just to be able to run tests; implement this
       List<PlanFragment> planRoots = planner.createParallelPlans();
       for (PlanFragment planRoot: planRoots) {