You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/11 23:21:35 UTC

impala git commit: IMPALA-7095: clean up scan node profiles

Repository: impala
Updated Branches:
  refs/heads/master 192771511 -> 5d672457c


IMPALA-7095: clean up scan node profiles

Add counters to scan node implementations where they make sense but were
missing (e.g. row batch queue counters for multithread Kudu scans) and
remove them where they don't make sense (e.g. scanner thread counters
for non-multithreaded scans).

Refactors the multithreaded Kudu and HDFS scans to share logic via
composition (single inheritance doesn't work for this case),
which enables the same set of counters to be maintained with shared
code. The row batch queueing and thread tracking is now shared. I looked
at combining the logic around 'status_', 'lock_' and 'done_' between the
two but the details were different enough that it didn't seem worth
abstracting.

Adds a PeakScannerThreadConcurrency counter - this answers a common
question.

Fixes RowsRead for data source scans.

Fix some of the comments to be more accurate/useful.

Testing:
Ran exhaustive tests. Ran various types of scans (HDFS, Kudu, HBase,
Data source) and inspected the profile output manually.

Change-Id: I77286282d42e7764bfdf94c7ec47cec9d743f787
Reviewed-on: http://gerrit.cloudera.org:8080/10810
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/5d672457
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/5d672457
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/5d672457

Branch: refs/heads/master
Commit: 5d672457cf05a10dc56c97609f873252b7c65ec1
Parents: 1927715
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Thu May 31 12:04:40 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Wed Jul 11 23:19:39 2018 +0000

----------------------------------------------------------------------
 be/src/exec/data-source-scan-node.cc      |   6 +-
 be/src/exec/data-source-scan-node.h       |  12 +-
 be/src/exec/hbase-scan-node.cc            |   7 +-
 be/src/exec/hbase-scan-node.h             |  21 +-
 be/src/exec/hbase-table-scanner.cc        |   2 +-
 be/src/exec/hdfs-scan-node-base.cc        |  24 +--
 be/src/exec/hdfs-scan-node-base.h         |  55 ++++-
 be/src/exec/hdfs-scan-node-mt.h           |  10 +-
 be/src/exec/hdfs-scan-node.cc             | 102 ++-------
 be/src/exec/hdfs-scan-node.h              |  63 ++----
 be/src/exec/hdfs-text-scanner.cc          |   3 +-
 be/src/exec/kudu-scan-node-base.h         |  10 +-
 be/src/exec/kudu-scan-node-mt.cc          |   2 +-
 be/src/exec/kudu-scan-node.cc             |  56 ++---
 be/src/exec/kudu-scan-node.h              |  32 +--
 be/src/exec/scan-node.cc                  | 155 +++++++++++--
 be/src/exec/scan-node.h                   | 288 +++++++++++++++++--------
 be/src/runtime/fragment-instance-state.cc |   2 +-
 be/src/util/blocking-queue.h              |   5 +
 be/src/util/thread.h                      |   2 +
 20 files changed, 498 insertions(+), 359 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/5d672457/be/src/exec/data-source-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/data-source-scan-node.cc b/be/src/exec/data-source-scan-node.cc
index d330710..88cf11b 100644
--- a/be/src/exec/data-source-scan-node.cc
+++ b/be/src/exec/data-source-scan-node.cc
@@ -331,6 +331,7 @@ Status DataSourceScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, boo
   ScalarExprEvaluator* const* evals = conjunct_evals_.data();
   int num_conjuncts = conjuncts_.size();
   DCHECK_EQ(num_conjuncts, conjunct_evals_.size());
+  int64_t rows_read = 0;
 
   while (true) {
     {
@@ -338,6 +339,7 @@ Status DataSourceScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, boo
       // copy rows until we hit the limit/capacity or until we exhaust input_batch_
       while (!ReachedLimit() && !row_batch->AtCapacity() && InputBatchHasNext()) {
         RETURN_IF_ERROR(MaterializeNextRow(state->local_time_zone(), tuple_pool, tuple));
+        ++rows_read;
         int row_idx = row_batch->AddRow();
         TupleRow* tuple_row = row_batch->GetRow(row_idx);
         tuple_row->SetTuple(tuple_idx_, tuple);
@@ -350,10 +352,10 @@ Status DataSourceScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, boo
         }
         ++next_row_idx_;
       }
-      COUNTER_SET(rows_returned_counter_, num_rows_returned_);
-
       if (ReachedLimit() || row_batch->AtCapacity() || input_batch_->eos) {
         *eos = ReachedLimit() || input_batch_->eos;
+        COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+        COUNTER_ADD(rows_read_counter_, rows_read);
         return Status::OK();
       }
     }

http://git-wip-us.apache.org/repos/asf/impala/blob/5d672457/be/src/exec/data-source-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/data-source-scan-node.h b/be/src/exec/data-source-scan-node.h
index 09c0c71..065fcf3 100644
--- a/be/src/exec/data-source-scan-node.h
+++ b/be/src/exec/data-source-scan-node.h
@@ -46,23 +46,23 @@ class DataSourceScanNode : public ScanNode {
   ~DataSourceScanNode();
 
   /// Load the data source library and create the ExternalDataSourceExecutor.
-  virtual Status Prepare(RuntimeState* state);
+  virtual Status Prepare(RuntimeState* state) override;
 
   /// Open the data source and initialize the first row batch.
-  virtual Status Open(RuntimeState* state);
+  virtual Status Open(RuntimeState* state) override;
 
   /// Fill the next row batch, calls GetNext() on the external scanner.
-  virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
+  virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
 
   /// NYI
-  virtual Status Reset(RuntimeState* state);
+  virtual Status Reset(RuntimeState* state) override;
 
   /// Close the scanner, and report errors.
-  virtual void Close(RuntimeState* state);
+  virtual void Close(RuntimeState* state) override;
 
  protected:
   /// Write debug string of this into out.
-  virtual void DebugString(int indentation_level, std::stringstream* out) const;
+  virtual void DebugString(int indentation_level, std::stringstream* out) const override;
 
  private:
   /// Used to call the external data source.

http://git-wip-us.apache.org/repos/asf/impala/blob/5d672457/be/src/exec/hbase-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hbase-scan-node.cc b/be/src/exec/hbase-scan-node.cc
index a74d4b3..5aee999 100644
--- a/be/src/exec/hbase-scan-node.cc
+++ b/be/src/exec/hbase-scan-node.cc
@@ -57,7 +57,8 @@ HBaseScanNode::~HBaseScanNode() {
 
 Status HBaseScanNode::Prepare(RuntimeState* state) {
   RETURN_IF_ERROR(ScanNode::Prepare(state));
-  read_timer_ = ADD_TIMER(runtime_profile(), TOTAL_HBASE_READ_TIMER);
+  hbase_read_timer_ = ADD_TIMER(runtime_profile(), TOTAL_HBASE_READ_TIMER);
+  AddBytesReadCounters();
 
   hbase_scanner_.reset(new HBaseTableScanner(this, state->htable_factory(), state));
 
@@ -152,7 +153,6 @@ Status HBaseScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eo
   // but there's still some considerable time inside here.
   // TODO: need to understand how the time is spent inside this function.
   SCOPED_TIMER(runtime_profile_->total_time_counter());
-  SCOPED_THREAD_COUNTER_MEASUREMENT(scanner_thread_counters());
 
   if (scan_range_vector_.empty() || ReachedLimit()) {
     *eos = true;
@@ -272,8 +272,7 @@ Status HBaseScanNode::Reset(RuntimeState* state) {
 void HBaseScanNode::Close(RuntimeState* state) {
   if (is_closed()) return;
   SCOPED_TIMER(runtime_profile_->total_time_counter());
-  PeriodicCounterUpdater::StopRateCounter(total_throughput_counter());
-  PeriodicCounterUpdater::StopTimeSeriesCounter(bytes_read_timeseries_counter_);
+  runtime_profile_->StopPeriodicCounters();
 
   if (hbase_scanner_.get() != NULL) {
     JNIEnv* env = getJNIEnv();

http://git-wip-us.apache.org/repos/asf/impala/blob/5d672457/be/src/exec/hbase-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hbase-scan-node.h b/be/src/exec/hbase-scan-node.h
index e0a036d..dbe7ff1 100644
--- a/be/src/exec/hbase-scan-node.h
+++ b/be/src/exec/hbase-scan-node.h
@@ -29,6 +29,10 @@ namespace impala {
 class TextConverter;
 class Tuple;
 
+/// Counters:
+///
+///   TotalRawHbaseReadTime - the total wall clock time spent in HBase read calls.
+///
 class HBaseScanNode : public ScanNode {
  public:
   HBaseScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
@@ -37,26 +41,28 @@ class HBaseScanNode : public ScanNode {
 
   /// Prepare conjuncts, create HBase columns to slots mapping,
   /// initialize hbase_scanner_, and create text_converter_.
-  virtual Status Prepare(RuntimeState* state);
+  virtual Status Prepare(RuntimeState* state) override;
 
   /// Start HBase scan using hbase_scanner_.
-  virtual Status Open(RuntimeState* state);
+  virtual Status Open(RuntimeState* state) override;
 
   /// Fill the next row batch by calling Next() on the hbase_scanner_,
   /// converting text data in HBase cells to binary data.
-  virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
+  virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
 
   /// NYI
-  virtual Status Reset(RuntimeState* state);
+  virtual Status Reset(RuntimeState* state) override;
 
   /// Close the hbase_scanner_, and report errors.
-  virtual void Close(RuntimeState* state);
+  virtual void Close(RuntimeState* state) override;
 
   int suggested_max_caching() const { return suggested_max_caching_; }
 
+  RuntimeProfile::Counter* hbase_read_timer() const { return hbase_read_timer_; }
+
  protected:
   /// Write debug string of this into out.
-  virtual void DebugString(int indentation_level, std::stringstream* out) const;
+  virtual void DebugString(int indentation_level, std::stringstream* out) const override;
 
  private:
   const static int SKIP_COLUMN = -1;
@@ -110,6 +116,9 @@ class HBaseScanNode : public ScanNode {
   /// will be 0.
   int suggested_max_caching_;
 
+  /// Total wall clock time spent reading from HBase.
+  RuntimeProfile::Counter* hbase_read_timer_ = nullptr;
+
   /// Writes a slot in tuple from an HBase value containing text data.
   /// The HBase value is converted into the appropriate target type.
   void WriteTextSlot(

http://git-wip-us.apache.org/repos/asf/impala/blob/5d672457/be/src/exec/hbase-table-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hbase-table-scanner.cc b/be/src/exec/hbase-table-scanner.cc
index cf13e75..ea03b3b 100644
--- a/be/src/exec/hbase-table-scanner.cc
+++ b/be/src/exec/hbase-table-scanner.cc
@@ -505,7 +505,7 @@ Status HBaseTableScanner::Next(JNIEnv* env, bool* has_next) {
   RETURN_IF_ERROR(jni_frame.push(env));
   jobject result = NULL;
   {
-    SCOPED_TIMER(scan_node_->read_timer());
+    SCOPED_TIMER(scan_node_->hbase_read_timer());
     while (true) {
       DCHECK(resultscanner_ != NULL);
       // result_ = resultscanner_.next();

http://git-wip-us.apache.org/repos/asf/impala/blob/5d672457/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 9842533..6454652 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -119,6 +119,7 @@ Status HdfsScanNodeBase::Init(const TPlanNode& tnode, RuntimeState* state) {
 Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ScanNode::Prepare(state));
+  AddBytesReadCounters();
 
   // Prepare collection conjuncts
   for (const auto& entry: conjuncts_map_) {
@@ -342,27 +343,25 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
   reader_context_ = runtime_state_->io_mgr()->RegisterContext();
 
   // Initialize HdfsScanNode specific counters
-  // TODO: Revisit counters and move the counters specific to multi-threaded scans
-  // into HdfsScanNode.
-  read_timer_ = ADD_TIMER(runtime_profile(), TOTAL_HDFS_READ_TIMER);
-  open_file_timer_ = ADD_TIMER(runtime_profile(), TOTAL_HDFS_OPEN_FILE_TIMER);
+  hdfs_read_timer_ = ADD_TIMER(runtime_profile(), TOTAL_HDFS_READ_TIMER);
+  hdfs_open_file_timer_ = ADD_TIMER(runtime_profile(), TOTAL_HDFS_OPEN_FILE_TIMER);
   per_read_thread_throughput_counter_ = runtime_profile()->AddDerivedCounter(
       PER_READ_THREAD_THROUGHPUT_COUNTER, TUnit::BYTES_PER_SECOND,
-      bind<int64_t>(&RuntimeProfile::UnitsPerSecond, bytes_read_counter_, read_timer_));
+      bind<int64_t>(&RuntimeProfile::UnitsPerSecond, bytes_read_counter_,
+      hdfs_read_timer_));
   scan_ranges_complete_counter_ =
       ADD_COUNTER(runtime_profile(), SCAN_RANGES_COMPLETE_COUNTER, TUnit::UNIT);
+  collection_items_read_counter_ =
+      ADD_COUNTER(runtime_profile(), COLLECTION_ITEMS_READ_COUNTER, TUnit::UNIT);
   if (DiskInfo::num_disks() < 64) {
     num_disks_accessed_counter_ =
         ADD_COUNTER(runtime_profile(), NUM_DISKS_ACCESSED_COUNTER, TUnit::UNIT);
   } else {
     num_disks_accessed_counter_ = NULL;
   }
-  num_scanner_threads_started_counter_ =
-      ADD_COUNTER(runtime_profile(), NUM_SCANNER_THREADS_STARTED, TUnit::UNIT);
-
   reader_context_->set_bytes_read_counter(bytes_read_counter());
-  reader_context_->set_read_timer(read_timer());
-  reader_context_->set_open_file_timer(open_file_timer());
+  reader_context_->set_read_timer(hdfs_read_timer_);
+  reader_context_->set_open_file_timer(hdfs_open_file_timer_);
   reader_context_->set_active_read_thread_counter(&active_hdfs_read_thread_counter_);
   reader_context_->set_disks_accessed_bitmap(&disks_accessed_bitmap_);
 
@@ -419,8 +418,7 @@ void HdfsScanNodeBase::Close(RuntimeState* state) {
 
   StopAndFinalizeCounters();
 
-  // There should be no active scanner threads and hdfs read threads.
-  DCHECK_EQ(active_scanner_thread_counter_.value(), 0);
+  // There should be no active hdfs read threads.
   DCHECK_EQ(active_hdfs_read_thread_counter_.value(), 0);
 
   if (scan_node_pool_.get() != NULL) scan_node_pool_->FreeAll();
@@ -757,7 +755,7 @@ void HdfsScanNodeBase::RangeComplete(const THdfsFileFormat::type& file_type,
 
 void HdfsScanNodeBase::RangeComplete(const THdfsFileFormat::type& file_type,
     const vector<THdfsCompression::type>& compression_types, bool skipped) {
-  scan_ranges_complete_counter()->Add(1);
+  scan_ranges_complete_counter_->Add(1);
   progress_.Update(1);
   HdfsCompressionTypesSet compression_set;
   for (int i = 0; i < compression_types.size(); ++i) {

http://git-wip-us.apache.org/repos/asf/impala/blob/5d672457/be/src/exec/hdfs-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 4c0a233..e044ddf 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -114,8 +114,32 @@ struct ScanRangeMetadata {
 /// Scanners may also use the same filters to eliminate rows at finer granularities
 /// (e.g. per row).
 ///
-/// TODO: Revisit and minimize metrics. Move those specific to legacy multi-threaded
-/// scans into HdfsScanNode.
+/// Counters:
+///
+///   TotalRawHdfsReadTime - the total wall clock time spent by Disk I/O threads in HDFS
+///     read operations. For example, if we have 3 reading threads and each spent 1 sec,
+///     this counter will report 3 sec.
+///
+///   TotalRawHdfsOpenFileTime - the total wall clock time spent by Disk I/O threads in
+///     HDFS open operations.
+///
+///   PerReadThreadRawHdfsThroughput - the read throughput in bytes/sec for each HDFS
+///     read thread while it is executing I/O operations on behalf of this scan.
+///
+///   NumDisksAccessed - number of distinct disks accessed by HDFS scan. Each local disk
+///     is counted as a disk and each remote disk queue (e.g. HDFS remote reads, S3)
+///     is counted as a distinct disk.
+///
+///   AverageHdfsReadThreadConcurrency - the average number of HDFS read threads
+///     executing read operations on behalf of this scan. Higher values show that this
+///     scan is using a larger proportion of the I/O capacity of the system. Lower values
+///     show that either this thread is not I/O bound or that it is getting a small share
+///     of the I/O capacity of the system because of other concurrently executing
+///     queries.
+///
+///   Hdfs Read Thread Concurrency Bucket - the bucket counting (%) of HDFS read thread
+///     concurrency.
+///
 /// TODO: Once the legacy scan node has been removed, several functions can be made
 /// non-virtual. Also merge this class with HdfsScanNodeMt.
 class HdfsScanNodeBase : public ScanNode {
@@ -123,12 +147,13 @@ class HdfsScanNodeBase : public ScanNode {
   HdfsScanNodeBase(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
   ~HdfsScanNodeBase();
 
-  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) WARN_UNUSED_RESULT;
-  virtual Status Prepare(RuntimeState* state) WARN_UNUSED_RESULT;
-  virtual void Codegen(RuntimeState* state);
-  virtual Status Open(RuntimeState* state) WARN_UNUSED_RESULT;
-  virtual Status Reset(RuntimeState* state) WARN_UNUSED_RESULT;
-  virtual void Close(RuntimeState* state);
+  virtual Status Init(const TPlanNode& tnode, RuntimeState* state)
+      override WARN_UNUSED_RESULT;
+  virtual Status Prepare(RuntimeState* state) override WARN_UNUSED_RESULT;
+  virtual void Codegen(RuntimeState* state) override;
+  virtual Status Open(RuntimeState* state) override WARN_UNUSED_RESULT;
+  virtual Status Reset(RuntimeState* state) override WARN_UNUSED_RESULT;
+  virtual void Close(RuntimeState* state) override;
 
   /// Returns true if this node uses separate threads for scanners that append RowBatches
   /// to a queue, false otherwise.
@@ -292,8 +317,6 @@ class HdfsScanNodeBase : public ScanNode {
   virtual void TransferToScanNodePool(MemPool* pool);
 
   /// map from volume id to <number of split, per volume split lengths>
-  /// TODO: move this into some global .h, no need to include this file just for this
-  /// typedef
   typedef boost::unordered_map<int32_t, std::pair<int, int64_t>> PerVolumeStats;
 
   /// Update the per volume stats with the given scan range params list
@@ -487,6 +510,18 @@ class HdfsScanNodeBase : public ScanNode {
   /// taken where there are i concurrent hdfs read thread running. Created in Open().
   std::vector<RuntimeProfile::Counter*>* hdfs_read_thread_concurrency_bucket_ = nullptr;
 
+  /// HDFS read throughput per Disk I/O thread [bytes/sec],
+  RuntimeProfile::Counter* per_read_thread_throughput_counter_ = nullptr;
+
+  /// Total number of disks accessed for this scan node.
+  RuntimeProfile::Counter* num_disks_accessed_counter_ = nullptr;
+
+  /// Total file read time in I/O mgr disk thread.
+  RuntimeProfile::Counter* hdfs_read_timer_ = nullptr;
+
+  /// Total time spent opening file handles in I/O mgr disk thread.
+  RuntimeProfile::Counter* hdfs_open_file_timer_ = nullptr;
+
   /// Track stats about ideal/actual reservation for initial scan ranges so we can
   /// determine if the scan got all of the reservation it wanted. Does not include
   /// subsequent reservation increases done by scanner implementation (e.g. for Parquet

http://git-wip-us.apache.org/repos/asf/impala/blob/5d672457/be/src/exec/hdfs-scan-node-mt.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-mt.h b/be/src/exec/hdfs-scan-node-mt.h
index 3db232f..78d7718 100644
--- a/be/src/exec/hdfs-scan-node-mt.h
+++ b/be/src/exec/hdfs-scan-node-mt.h
@@ -40,13 +40,13 @@ class HdfsScanNodeMt : public HdfsScanNodeBase {
   HdfsScanNodeMt(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
   ~HdfsScanNodeMt();
 
-  virtual Status Prepare(RuntimeState* state) WARN_UNUSED_RESULT;
-  virtual Status Open(RuntimeState* state) WARN_UNUSED_RESULT;
+  virtual Status Prepare(RuntimeState* state) override WARN_UNUSED_RESULT;
+  virtual Status Open(RuntimeState* state) override WARN_UNUSED_RESULT;
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos)
-      WARN_UNUSED_RESULT;
-  virtual void Close(RuntimeState* state);
+      override WARN_UNUSED_RESULT;
+  virtual void Close(RuntimeState* state) override;
 
-  virtual bool HasRowBatchQueue() const { return false; }
+  virtual bool HasRowBatchQueue() const override { return false; }
 
  private:
   /// Current scan range and corresponding scanner.

http://git-wip-us.apache.org/repos/asf/impala/blob/5d672457/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index e9c0864..1ceaf2c 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -38,16 +38,8 @@
 
 #include "common/names.h"
 
-DEFINE_int32(max_row_batches, 0, "the maximum size of materialized_row_batches_");
-
-// The maximum capacity of materialized_row_batches_ per scanner thread that can
-// be created. This is multiplied by 'max_num_scanner_threads_' to get an upper
-// bound on the queue size. This reduces the queue size on systems with many disks
-// and makes the num_scanner_threads query option more effective at reducing memory
-// consumption. For now, make this relatively high. We should consider lowering
-// this or using a better heuristic (e.g. based on queued memory).
-DEFINE_int32_hidden(max_queued_row_batches_per_scanner_thread, 5,
-    "(Advanced) the maximum number of queued row batches per scanner thread.");
+DEFINE_int32(max_row_batches, 0,
+    "the maximum number of batches to queue in multithreaded HDFS scans");
 
 #ifndef NDEBUG
 DECLARE_bool(skip_file_runtime_filtering);
@@ -62,8 +54,7 @@ const int SCANNER_THREAD_WAIT_TIME_MS = 20;
 
 HdfsScanNode::HdfsScanNode(ObjectPool* pool, const TPlanNode& tnode,
                            const DescriptorTbl& descs)
-    : HdfsScanNodeBase(pool, tnode, descs),
-      max_num_scanner_threads_(CpuInfo::num_cores()) {
+    : HdfsScanNodeBase(pool, tnode, descs) {
 }
 
 HdfsScanNode::~HdfsScanNode() {
@@ -91,9 +82,6 @@ Status HdfsScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos
     unique_lock<mutex> l(lock_);
     lock_guard<SpinLock> l2(file_type_counts_);
     StopAndFinalizeCounters();
-    row_batches_put_timer_->Set(materialized_row_batches_->total_put_wait_time());
-    row_batches_get_timer_->Set(materialized_row_batches_->total_get_wait_time());
-    row_batches_peak_mem_consumption_->Set(row_batches_mem_tracker_->peak_consumption());
   }
   return status;
 }
@@ -111,7 +99,7 @@ Status HdfsScanNode::GetNextInternal(
     return Status::OK();
   }
   *eos = false;
-  unique_ptr<RowBatch> materialized_batch = materialized_row_batches_->GetBatch();
+  unique_ptr<RowBatch> materialized_batch = thread_state_.batch_queue()->GetBatch();
   if (materialized_batch != NULL) {
     row_batch->AcquireState(materialized_batch.get());
     // Update the number of materialized rows now instead of when they are materialized.
@@ -140,52 +128,10 @@ Status HdfsScanNode::GetNextInternal(
   return status_;
 }
 
-Status HdfsScanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
-  if (state->query_options().num_scanner_threads > 0) {
-    max_num_scanner_threads_ = state->query_options().num_scanner_threads;
-  }
-  DCHECK_GT(max_num_scanner_threads_, 0);
-
-  int default_max_row_batches = FLAGS_max_row_batches;
-  if (default_max_row_batches <= 0) {
-    // TODO: IMPALA-7096: re-evaluate this heuristic to get a tighter bound on memory
-    // consumption, we could do something better.
-    default_max_row_batches = min(
-        10 * DiskInfo::num_disks() + DiskIoMgr::REMOTE_NUM_DISKS,
-        max_num_scanner_threads_ * FLAGS_max_queued_row_batches_per_scanner_thread);
-  }
-  if (state->query_options().__isset.mt_dop && state->query_options().mt_dop > 0) {
-    // To avoid a significant memory increase, adjust the number of maximally queued
-    // row batches per scan instance based on MT_DOP. The max materialized row batches
-    // is at least 2 to allow for some parallelism between the producer/consumer.
-    max_materialized_row_batches_ =
-        max(2, default_max_row_batches / state->query_options().mt_dop);
-  } else {
-    max_materialized_row_batches_ = default_max_row_batches;
-  }
-  VLOG_QUERY << "Max row batch queue size for scan node '" << id_
-      << "' in fragment instance '" << PrintId(state->fragment_instance_id())
-      << "': " << max_materialized_row_batches_;
-  materialized_row_batches_.reset(new RowBatchQueue(max_materialized_row_batches_));
-  return HdfsScanNodeBase::Init(tnode, state);
-}
-
 Status HdfsScanNode::Prepare(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(HdfsScanNodeBase::Prepare(state));
-  row_batches_mem_tracker_ = state->obj_pool()->Add(new MemTracker(
-        -1, "Queued Batches", mem_tracker(), false));
-
-  row_batches_enqueued_ =
-      ADD_COUNTER(runtime_profile(), "RowBatchesEnqueued", TUnit::UNIT);
-  row_batch_bytes_enqueued_ =
-      ADD_COUNTER(runtime_profile(), "RowBatchBytesEnqueued", TUnit::BYTES);
-  row_batches_get_timer_ = ADD_TIMER(runtime_profile(), "RowBatchQueueGetWaitTime");
-  row_batches_put_timer_ = ADD_TIMER(runtime_profile(), "RowBatchQueuePutWaitTime");
-  row_batches_max_capacity_ = runtime_profile()->AddHighWaterMarkCounter(
-      "RowBatchQueueCapacity", TUnit::UNIT);
-  row_batches_peak_mem_consumption_ =
-      ADD_COUNTER(runtime_profile(), "RowBatchQueuePeakMemoryUsage", TUnit::BYTES);
+  thread_state_.Prepare(this);
   scanner_thread_reservations_denied_counter_ =
       ADD_COUNTER(runtime_profile(), "NumScannerThreadReservationsDenied", TUnit::UNIT);
   return Status::OK();
@@ -197,11 +143,7 @@ Status HdfsScanNode::Prepare(RuntimeState* state) {
 Status HdfsScanNode::Open(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(HdfsScanNodeBase::Open(state));
-
-  if (file_descs_.empty() || progress_.done()) return Status::OK();
-
-  average_scanner_thread_concurrency_ = runtime_profile()->AddSamplingCounter(
-      AVERAGE_SCANNER_THREAD_CONCURRENCY, &active_scanner_thread_counter_);
+  thread_state_.Open(this, FLAGS_max_row_batches);
 
   thread_avail_cb_id_ = runtime_state_->resource_pool()->AddThreadAvailableCb(
       bind<void>(mem_fn(&HdfsScanNode::ThreadTokenAvailableCb), this, _1));
@@ -214,9 +156,7 @@ void HdfsScanNode::Close(RuntimeState* state) {
   if (thread_avail_cb_id_ != -1) {
     state->resource_pool()->RemoveThreadAvailableCb(thread_avail_cb_id_);
   }
-  scanner_threads_.JoinAll();
-  materialized_row_batches_->Cleanup();
-  if (row_batches_mem_tracker_ != nullptr) row_batches_mem_tracker_->Close();
+  thread_state_.Close();
   HdfsScanNodeBase::Close(state);
 }
 
@@ -233,13 +173,7 @@ void HdfsScanNode::TransferToScanNodePool(MemPool* pool) {
 
 void HdfsScanNode::AddMaterializedRowBatch(unique_ptr<RowBatch> row_batch) {
   InitNullCollectionValues(row_batch.get());
-  COUNTER_ADD(row_batches_enqueued_, 1);
-  // Only need to count tuple_data_pool() bytes since after IMPALA-5307, no buffers are
-  // returned from the scan node.
-  COUNTER_ADD(row_batch_bytes_enqueued_,
-      row_batch->tuple_data_pool()->total_reserved_bytes());
-  row_batch->SetMemTracker(row_batches_mem_tracker_);
-  materialized_row_batches_->AddBatch(move(row_batch));
+  thread_state_.EnqueueBatch(move(row_batch));
 }
 
 Status HdfsScanNode::AddDiskIoRanges(const vector<ScanRange*>& ranges,
@@ -290,7 +224,7 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourcePool* pool) {
     // correct way to communicate between this method and ScannerThreadHelper
     unique_lock<mutex> lock(lock_);
 
-    const int64_t num_active_scanner_threads = active_scanner_thread_counter_.value();
+    const int64_t num_active_scanner_threads = thread_state_.GetNumActive();
     const bool first_thread = num_active_scanner_threads == 0;
     const int64_t scanner_thread_reservation = resource_profile_.min_reservation;
     // Cases 1, 2, 3.
@@ -301,7 +235,7 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourcePool* pool) {
 
     if (!first_thread) {
       // Cases 5 and 6.
-      if (materialized_row_batches_->Size() >= max_materialized_row_batches_) break;
+      if (thread_state_.batch_queue()->AtCapacity()) break;
       // The node's min reservation is for the first thread so we don't need to check
       if (!buffer_pool_client()->IncreaseReservation(scanner_thread_reservation)) {
         COUNTER_ADD(scanner_thread_reservations_denied_counter_, 1);
@@ -313,16 +247,15 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourcePool* pool) {
     if (first_thread) {
       // The first thread is required to make progress on the scan.
       pool->AcquireThreadToken();
-    } else if (active_scanner_thread_counter_.value() >= max_num_scanner_threads_
+    } else if (thread_state_.GetNumActive() >= thread_state_.max_num_scanner_threads()
         || !pool->TryAcquireThreadToken()) {
       ReturnReservationFromScannerThread(lock, scanner_thread_reservation);
       break;
     }
 
-    COUNTER_ADD(&active_scanner_thread_counter_, 1);
     string name = Substitute("scanner-thread (finst:$0, plan-node-id:$1, thread-idx:$2)",
         PrintId(runtime_state_->fragment_instance_id()), id(),
-        num_scanner_threads_started_counter_->value());
+        thread_state_.GetNumStarted());
     auto fn = [this, first_thread, scanner_thread_reservation]() {
       this->ScannerThread(first_thread, scanner_thread_reservation);
     };
@@ -331,7 +264,7 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourcePool* pool) {
       Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME, name, fn, &t, true);
     if (!status.ok()) {
       ReturnReservationFromScannerThread(lock, scanner_thread_reservation);
-      COUNTER_ADD(&active_scanner_thread_counter_, -1);
+      thread_state_.DecrementNumActive();
       // Release the token and skip running callbacks to find a replacement. Skipping
       // serves two purposes. First, it prevents a mutual recursion between this function
       // and ReleaseThreadToken()->InvokeCallbacks(). Second, Thread::Create() failed and
@@ -346,13 +279,12 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourcePool* pool) {
       break;
     }
     // Thread successfully started
-    COUNTER_ADD(num_scanner_threads_started_counter_, 1);
-    scanner_threads_.AddThread(move(t));
+    thread_state_.AddThread(move(t));
   }
 }
 
 void HdfsScanNode::ScannerThread(bool first_thread, int64_t scanner_thread_reservation) {
-  SCOPED_THREAD_COUNTER_MEASUREMENT(scanner_thread_counters());
+  SCOPED_THREAD_COUNTER_MEASUREMENT(thread_state_.thread_counters());
   SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics());
   // Make thread-local copy of filter contexts to prune scan ranges, and to pass to the
   // scanner for finer-grained filtering. Use a thread-local MemPool for the filter
@@ -437,7 +369,7 @@ void HdfsScanNode::ScannerThread(bool first_thread, int64_t scanner_thread_reser
   filter_mem_pool.FreeAll();
   expr_results_pool.FreeAll();
   runtime_state_->resource_pool()->ReleaseThreadToken(first_thread);
-  COUNTER_ADD(&active_scanner_thread_counter_, -1);
+  thread_state_.DecrementNumActive();
 }
 
 Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
@@ -514,7 +446,7 @@ void HdfsScanNode::SetDoneInternal() {
   if (done_) return;
   done_ = true;
   if (reader_context_ != nullptr) reader_context_->Cancel();
-  materialized_row_batches_->Shutdown();
+  thread_state_.Shutdown();
 }
 
 void HdfsScanNode::SetDone() {

http://git-wip-us.apache.org/repos/asf/impala/blob/5d672457/be/src/exec/hdfs-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h
index a7026df..41d919e 100644
--- a/be/src/exec/hdfs-scan-node.h
+++ b/be/src/exec/hdfs-scan-node.h
@@ -31,7 +31,6 @@
 #include "exec/filter-context.h"
 #include "exec/hdfs-scan-node-base.h"
 #include "util/counting-barrier.h"
-#include "util/thread.h"
 
 namespace impala {
 
@@ -76,24 +75,22 @@ class HdfsScanNode : public HdfsScanNodeBase {
   HdfsScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
   ~HdfsScanNode();
 
-  virtual Status Init(const TPlanNode& tnode, RuntimeState* state) WARN_UNUSED_RESULT;
-  virtual Status Prepare(RuntimeState* state) WARN_UNUSED_RESULT;
-  virtual Status Open(RuntimeState* state) WARN_UNUSED_RESULT;
-  virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos)
+  virtual Status Prepare(RuntimeState* state) override WARN_UNUSED_RESULT;
+  virtual Status Open(RuntimeState* state) override WARN_UNUSED_RESULT;
+  virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override
       WARN_UNUSED_RESULT;
-  virtual void Close(RuntimeState* state);
+  virtual void Close(RuntimeState* state) override;
 
-  virtual bool HasRowBatchQueue() const { return true; }
+  virtual bool HasRowBatchQueue() const override { return true; }
 
   bool done() const { return done_; }
 
   /// Adds ranges to the io mgr queue and starts up new scanner threads if possible.
   virtual Status AddDiskIoRanges(const std::vector<io::ScanRange*>& ranges,
-      int num_files_queued) WARN_UNUSED_RESULT;
+      int num_files_queued) override WARN_UNUSED_RESULT;
 
   /// Adds a materialized row batch for the scan node.  This is called from scanner
-  /// threads.
-  /// This function will block if materialized_row_batches_ is full.
+  /// threads. This function will block if the row batch queue is full.
   void AddMaterializedRowBatch(std::unique_ptr<RowBatch> row_batch);
 
   /// Called by scanners when a range is complete. Used to record progress and set done_.
@@ -102,25 +99,18 @@ class HdfsScanNode : public HdfsScanNodeBase {
   /// batch queue. Otherwise, we may lose the last batch due to racing with shutting down
   /// the RowBatch queue.
   virtual void RangeComplete(const THdfsFileFormat::type& file_type,
-      const std::vector<THdfsCompression::type>& compression_type, bool skipped = false);
+      const std::vector<THdfsCompression::type>& compression_type, bool skipped = false)
+      override;
 
   /// Transfers all memory from 'pool' to 'scan_node_pool_'.
-  virtual void TransferToScanNodePool(MemPool* pool);
+  virtual void TransferToScanNodePool(MemPool* pool) override;
 
  private:
+  ScannerThreadState thread_state_;
+
   /// Released when initial ranges are issued in the first call to GetNext().
   CountingBarrier ranges_issued_barrier_{1};
 
-  /// Thread group for all scanner worker threads
-  ThreadGroup scanner_threads_;
-
-  /// Outgoing row batches queue. Row batches are produced asynchronously by the scanner
-  /// threads and consumed by the main thread.
-  boost::scoped_ptr<RowBatchQueue> materialized_row_batches_;
-
-  /// Maximum size of materialized_row_batches_.
-  int max_materialized_row_batches_;
-
   /// Lock protects access between scanner thread and main query thread (the one calling
   /// GetNext()) for all fields below.  If this lock and any other locks needs to be taken
   /// together, this lock must be taken first.
@@ -145,37 +135,10 @@ class HdfsScanNode : public HdfsScanNodeBase {
   /// -1 if no callback is registered.
   int thread_avail_cb_id_ = -1;
 
-  /// Maximum number of scanner threads. Set to 'NUM_SCANNER_THREADS' if that query
-  /// option is set. Otherwise, it's set to the number of cpu cores. Scanner threads
-  /// are generally cpu bound so there is no benefit in spinning up more threads than
-  /// the number of cores.
-  int max_num_scanner_threads_;
-
-  // MemTracker for queued row batches. Initialized in Prepare(). Owned by RuntimeState.
-  MemTracker* row_batches_mem_tracker_ = nullptr;
-
   // Number of times scanner threads were not created because of reservation increase
   // being denied.
   RuntimeProfile::Counter* scanner_thread_reservations_denied_counter_ = nullptr;
 
-  /// The number of row batches enqueued into the row batch queue.
-  RuntimeProfile::Counter* row_batches_enqueued_ = nullptr;
-
-  /// The total bytes of row batches enqueued into the row batch queue.
-  RuntimeProfile::Counter* row_batch_bytes_enqueued_ = nullptr;
-
-  /// The wait time for fetching a row batch from the row batch queue.
-  RuntimeProfile::Counter* row_batches_get_timer_ = nullptr;
-
-  /// The wait time for enqueuing a row batch into the row batch queue.
-  RuntimeProfile::Counter* row_batches_put_timer_ = nullptr;
-
-  /// Maximum capacity of the row batch queue.
-  RuntimeProfile::HighWaterMarkCounter* row_batches_max_capacity_ = nullptr;
-
-  /// Peak memory consumption of the materialized batch queue. Updated in Close().
-  RuntimeProfile::Counter* row_batches_peak_mem_consumption_ = nullptr;
-
   /// Tries to spin up as many scanner threads as the quota allows. Called explicitly
   /// (e.g., when adding new ranges) or when threads are available for this scan node.
   void ThreadTokenAvailableCb(ThreadResourcePool* pool);
@@ -208,7 +171,7 @@ class HdfsScanNode : public HdfsScanNodeBase {
   void ReturnReservationFromScannerThread(const boost::unique_lock<boost::mutex>& lock,
       int64_t bytes);
 
-  /// Checks for eos conditions and returns batches from materialized_row_batches_.
+  /// Checks for eos conditions and returns batches from the row batch queue.
   Status GetNextInternal(RuntimeState* state, RowBatch* row_batch, bool* eos)
       WARN_UNUSED_RESULT;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/5d672457/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index 3e4c223..7ca9398 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -770,8 +770,7 @@ Status HdfsTextScanner::Codegen(HdfsScanNodeBase* node,
 Status HdfsTextScanner::Open(ScannerContext* context) {
   RETURN_IF_ERROR(HdfsScanner::Open(context));
 
-  parse_delimiter_timer_ = ADD_CHILD_TIMER(scan_node_->runtime_profile(),
-      "DelimiterParseTime", ScanNode::SCANNER_THREAD_TOTAL_WALLCLOCK_TIME);
+  parse_delimiter_timer_ = ADD_TIMER(scan_node_->runtime_profile(), "DelimiterParseTime");
 
   // Allocate the scratch space for two pass parsing.  The most fields we can go
   // through in one parse pass is the batch size (tuples) * the number of fields per tuple

http://git-wip-us.apache.org/repos/asf/impala/blob/5d672457/be/src/exec/kudu-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node-base.h b/be/src/exec/kudu-scan-node-base.h
index 16910c5..8ec87c6 100644
--- a/be/src/exec/kudu-scan-node-base.h
+++ b/be/src/exec/kudu-scan-node-base.h
@@ -39,12 +39,12 @@ class KuduScanNodeBase : public ScanNode {
   KuduScanNodeBase(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
   ~KuduScanNodeBase();
 
-  virtual Status Prepare(RuntimeState* state);
-  virtual Status Open(RuntimeState* state);
-  virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) = 0;
-
+  virtual Status Prepare(RuntimeState* state) override;
+  virtual Status Open(RuntimeState* state) override;
+  virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos)
+      override = 0;
  protected:
-  virtual void DebugString(int indentation_level, std::stringstream* out) const;
+  virtual void DebugString(int indentation_level, std::stringstream* out) const override;
 
   /// Returns the total number of scan tokens
   int NumScanTokens() { return scan_tokens_.size(); }

http://git-wip-us.apache.org/repos/asf/impala/blob/5d672457/be/src/exec/kudu-scan-node-mt.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node-mt.cc b/be/src/exec/kudu-scan-node-mt.cc
index 22f00e7..5dfa5b1 100644
--- a/be/src/exec/kudu-scan-node-mt.cc
+++ b/be/src/exec/kudu-scan-node-mt.cc
@@ -75,7 +75,7 @@ Status KuduScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* e
   bool scanner_eos = false;
   RETURN_IF_ERROR(scanner_->GetNext(row_batch, &scanner_eos));
   if (scanner_eos) {
-    scan_ranges_complete_counter()->Add(1);
+    scan_ranges_complete_counter_->Add(1);
     scan_token_ = nullptr;
   }
   scanner_->KeepKuduScannerAlive();

http://git-wip-us.apache.org/repos/asf/impala/blob/5d672457/be/src/exec/kudu-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc
index 19f1d5c..48816f9 100644
--- a/be/src/exec/kudu-scan-node.cc
+++ b/be/src/exec/kudu-scan-node.cc
@@ -24,13 +24,11 @@
 #include "exprs/scalar-expr.h"
 #include "gutil/gscoped_ptr.h"
 #include "runtime/fragment-instance-state.h"
-#include "runtime/io/disk-io-mgr.h"
 #include "runtime/mem-pool.h"
 #include "runtime/runtime-state.h"
 #include "runtime/row-batch.h"
 #include "runtime/thread-resource-mgr.h"
 #include "runtime/tuple-row.h"
-#include "util/disk-info.h"
 #include "util/runtime-profile-counters.h"
 
 #include "common/names.h"
@@ -43,38 +41,25 @@ namespace impala {
 KuduScanNode::KuduScanNode(ObjectPool* pool, const TPlanNode& tnode,
     const DescriptorTbl& descs)
     : KuduScanNodeBase(pool, tnode, descs),
-      num_active_scanners_(0),
       done_(false),
-      max_num_scanner_threads_(CpuInfo::num_cores()),
       thread_avail_cb_id_(-1) {
   DCHECK(KuduIsAvailable());
-
-  int max_row_batches = FLAGS_kudu_max_row_batches;
-  if (max_row_batches <= 0) {
-    // TODO: See comment on hdfs-scan-node.
-    // This value is built the same way as it assumes that the scan node runs co-located
-    // with a Kudu tablet server and that the tablet server is using disks similarly as
-    // a datanode would.
-    max_row_batches = 10 * (DiskInfo::num_disks() + io::DiskIoMgr::REMOTE_NUM_DISKS);
-  }
-  materialized_row_batches_.reset(new RowBatchQueue(max_row_batches));
 }
 
 KuduScanNode::~KuduScanNode() {
   DCHECK(is_closed());
 }
 
+Status KuduScanNode::Prepare(RuntimeState* state) {
+  RETURN_IF_ERROR(KuduScanNodeBase::Prepare(state));
+  thread_state_.Prepare(this);
+  return Status::OK();
+}
+
 Status KuduScanNode::Open(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(KuduScanNodeBase::Open(state));
-
-  num_scanner_threads_started_counter_ =
-      ADD_COUNTER(runtime_profile(), NUM_SCANNER_THREADS_STARTED, TUnit::UNIT);
-
-  if (state->query_options().num_scanner_threads > 0) {
-    max_num_scanner_threads_ = runtime_state_->query_options().num_scanner_threads;
-  }
-  DCHECK_GT(max_num_scanner_threads_, 0);
+  thread_state_.Open(this, FLAGS_kudu_max_row_batches);
 
   if (filter_ctxs_.size() > 0) WaitForRuntimeFilters();
 
@@ -100,7 +85,7 @@ Status KuduScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos
   }
 
   *eos = false;
-  unique_ptr<RowBatch> materialized_batch = materialized_row_batches_->GetBatch();
+  unique_ptr<RowBatch> materialized_batch = thread_state_.batch_queue()->GetBatch();
   if (materialized_batch != NULL) {
     row_batch->AcquireState(materialized_batch.get());
     num_rows_returned_ += row_batch->num_rows();
@@ -133,9 +118,7 @@ void KuduScanNode::Close(RuntimeState* state) {
 
   SetDone();
 
-  scanner_threads_.JoinAll();
-  DCHECK_EQ(num_active_scanners_, 0);
-  materialized_row_batches_->Cleanup();
+  thread_state_.Close();
   KuduScanNodeBase::Close(state);
 }
 
@@ -144,12 +127,12 @@ void KuduScanNode::ThreadAvailableCb(ThreadResourcePool* pool) {
     unique_lock<mutex> lock(lock_);
     // All done or all tokens are assigned.
     if (done_ || !HasScanToken()) break;
-    bool first_thread = num_active_scanners_ == 0;
+    bool first_thread = thread_state_.GetNumActive() == 0;
 
     // Check if we can get a token. We need at least one thread to run.
     if (first_thread) {
       pool->AcquireThreadToken();
-    } else if (active_scanner_thread_counter_.value() >= max_num_scanner_threads_
+    } else if (thread_state_.GetNumActive() >= thread_state_.max_num_scanner_threads()
         || !pool->TryAcquireThreadToken()) {
       break;
     }
@@ -157,7 +140,7 @@ void KuduScanNode::ThreadAvailableCb(ThreadResourcePool* pool) {
     string name = Substitute(
         "kudu-scanner-thread (finst:$0, plan-node-id:$1, thread-idx:$2)",
         PrintId(runtime_state_->fragment_instance_id()), id(),
-        num_scanner_threads_started_counter_->value());
+        thread_state_.GetNumStarted());
 
     // Reserve the first token so no other thread picks it up.
     const string* token = GetNextScanToken();
@@ -182,10 +165,7 @@ void KuduScanNode::ThreadAvailableCb(ThreadResourcePool* pool) {
       break;
     }
     // Thread successfully started
-    COUNTER_ADD(num_scanner_threads_started_counter_, 1);
-    ++num_active_scanners_;
-    VLOG_RPC << "Thread started: " << name;
-    scanner_threads_.AddThread(move(t));
+    thread_state_.AddThread(move(t));
   }
 }
 
@@ -199,21 +179,21 @@ Status KuduScanNode::ProcessScanToken(KuduScanner* scanner, const string& scan_t
     RETURN_IF_ERROR(scanner->GetNext(row_batch.get(), &eos));
     while (!done_) {
       scanner->KeepKuduScannerAlive();
-      if (materialized_row_batches_->BlockingPutWithTimeout(move(row_batch), 1000000)) {
+      if (thread_state_.EnqueueBatchWithTimeout(&row_batch, 1000000)) {
         break;
       }
       // Make sure that we still own the RowBatch if BlockingPutWithTimeout() timed out.
       DCHECK(row_batch != nullptr);
     }
   }
-  if (eos) scan_ranges_complete_counter()->Add(1);
+  if (eos) scan_ranges_complete_counter_->Add(1);
   return Status::OK();
 }
 
 void KuduScanNode::RunScannerThread(
     bool first_thread, const string& name, const string* initial_token) {
   DCHECK(initial_token != nullptr);
-  SCOPED_THREAD_COUNTER_MEASUREMENT(scanner_thread_counters());
+  SCOPED_THREAD_COUNTER_MEASUREMENT(thread_state_.thread_counters());
   SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics());
   KuduScanner scanner(this, runtime_state_);
 
@@ -248,7 +228,7 @@ void KuduScanNode::RunScannerThread(
       status_ = status;
       SetDoneInternal();
     }
-    if (--num_active_scanners_ == 0) SetDoneInternal();
+    if (thread_state_.DecrementNumActive()) SetDoneInternal();
   }
 
   // lock_ is released before calling ThreadResourceMgr::ReleaseThreadToken() which
@@ -260,7 +240,7 @@ void KuduScanNode::RunScannerThread(
 void KuduScanNode::SetDoneInternal() {
   if (done_) return;
   done_ = true;
-  materialized_row_batches_->Shutdown();
+  thread_state_.Shutdown();
 }
 
 void KuduScanNode::SetDone() {

http://git-wip-us.apache.org/repos/asf/impala/blob/5d672457/be/src/exec/kudu-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node.h b/be/src/exec/kudu-scan-node.h
index 651c0f5..3b5ca75 100644
--- a/be/src/exec/kudu-scan-node.h
+++ b/be/src/exec/kudu-scan-node.h
@@ -24,7 +24,6 @@
 
 #include "exec/kudu-scan-node-base.h"
 #include "gutil/gscoped_ptr.h"
-#include "util/thread.h"
 
 namespace impala {
 
@@ -42,19 +41,19 @@ class KuduScanNode : public KuduScanNodeBase {
 
   ~KuduScanNode();
 
-  virtual Status Open(RuntimeState* state);
-  virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
-  virtual void Close(RuntimeState* state);
+  virtual Status Prepare(RuntimeState* state) override;
+  virtual Status Open(RuntimeState* state) override;
+  virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
+  virtual void Close(RuntimeState* state) override;
 
  private:
   friend class KuduScanner;
 
-  // Outgoing row batches queue. Row batches are produced asynchronously by the scanner
-  // threads and consumed by the main thread.
-  boost::scoped_ptr<RowBatchQueue> materialized_row_batches_;
+  ScannerThreadState thread_state_;
 
-  /// Protects access to state accessed by scanner threads, such as 'status_' or
-  /// 'num_active_scanners_'.
+  /// Protects access to state accessed by scanner threads, such as 'status_' and 'done_'.
+  /// Writers to 'done_' must hold lock to prevent races when updating, but readers can
+  /// read without holding lock, provided they can tolerate stale reads.
   boost::mutex lock_;
 
   /// The current status of the scan, set to non-OK if any problems occur, e.g. if an
@@ -62,10 +61,6 @@ class KuduScanNode : public KuduScanNodeBase {
   /// Protected by lock_
   Status status_;
 
-  /// Number of active running scanner threads.
-  /// Protected by lock_
-  int num_active_scanners_;
-
   /// Set to true when the scan is complete (either because all scan tokens have been
   /// processed, the limit was reached or some error occurred).
   /// Protected by lock_. It is safe to do optimistic reads without taking lock_ in
@@ -74,15 +69,6 @@ class KuduScanNode : public KuduScanNodeBase {
   /// contention.
   volatile bool done_;
 
-  /// Thread group for all scanner worker threads
-  ThreadGroup scanner_threads_;
-
-  /// Maximum number of scanner threads. Set to 'NUM_SCANNER_THREADS' if that query
-  /// option is set. Otherwise, it's set to the number of cpu cores. Scanner threads
-  /// are generally cpu bound so there is no benefit in spinning up more threads than
-  /// the number of cores.
-  int max_num_scanner_threads_;
-
   /// The id of the callback added to the thread resource manager when a thread
   /// is available. Used to remove the callback before this scan node is destroyed.
   /// -1 if no callback is registered.
@@ -105,7 +91,7 @@ class KuduScanNode : public KuduScanNodeBase {
       bool first_thread, const std::string& name, const std::string* initial_token);
 
   /// Processes a single scan token. Row batches are fetched using 'scanner' and enqueued
-  /// in 'materialized_row_batches_' until the scanner reports eos, an error occurs, or
+  /// in the row batch queue until the scanner reports eos, an error occurs, or
   /// the limit is reached.
   Status ProcessScanToken(KuduScanner* scanner, const std::string& scan_token);
 

http://git-wip-us.apache.org/repos/asf/impala/blob/5d672457/be/src/exec/scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scan-node.cc b/be/src/exec/scan-node.cc
index 8a18a0c..01aa269 100644
--- a/be/src/exec/scan-node.cc
+++ b/be/src/exec/scan-node.cc
@@ -20,8 +20,11 @@
 #include <boost/bind.hpp>
 
 #include "exprs/scalar-expr.h"
+#include "runtime/io/disk-io-mgr.h"
+#include "runtime/row-batch.h"
 #include "runtime/runtime-filter.inline.h"
 #include "runtime/runtime-state.h"
+#include "util/disk-info.h"
 #include "util/runtime-profile-counters.h"
 
 #include "common/names.h"
@@ -29,6 +32,15 @@
 DEFINE_int32(runtime_filter_wait_time_ms, 1000, "(Advanced) the maximum time, in ms, "
     "that a scan node will wait for expected runtime filters to arrive.");
 
+// The maximum capacity of batch_queue_ per scanner thread that can
+// be created. This is multiplied by 'max_num_scanner_threads' to get an upper
+// bound on the queue size. This reduces the queue size on systems with many disks
+// and makes the num_scanner_threads query option more effective at reducing memory
+// consumption. For now, make this relatively high. We should consider lowering
+// this or using a better heuristic (e.g. based on queued memory).
+DEFINE_int32_hidden(max_queued_row_batches_per_scanner_thread, 5,
+    "(Advanced) the maximum number of queued row batches per scanner thread.");
+
 using boost::algorithm::join;
 
 namespace impala {
@@ -51,6 +63,8 @@ const string ScanNode::SCANNER_THREAD_TOTAL_WALLCLOCK_TIME =
     "ScannerThreadsTotalWallClockTime";
 const string ScanNode::AVERAGE_SCANNER_THREAD_CONCURRENCY =
     "AverageScannerThreadConcurrency";
+const string ScanNode::PEAK_SCANNER_THREAD_CONCURRENCY =
+    "PeakScannerThreadConcurrency";
 const string ScanNode::AVERAGE_HDFS_READ_THREAD_CONCURRENCY =
     "AverageHdfsReadThreadConcurrency";
 const string ScanNode::NUM_SCANNER_THREADS_STARTED =
@@ -96,30 +110,27 @@ Status ScanNode::Prepare(RuntimeState* state) {
   runtime_state_ = state;
   RETURN_IF_ERROR(ExecNode::Prepare(state));
 
-  scanner_thread_counters_ =
-      ADD_THREAD_COUNTERS(runtime_profile(), SCANNER_THREAD_COUNTERS_PREFIX);
-  bytes_read_counter_ =
-      ADD_COUNTER(runtime_profile(), BYTES_READ_COUNTER, TUnit::BYTES);
-  bytes_read_timeseries_counter_ = ADD_TIME_SERIES_COUNTER(runtime_profile(),
-      BYTES_READ_COUNTER, bytes_read_counter_);
   rows_read_counter_ =
       ADD_COUNTER(runtime_profile(), ROWS_READ_COUNTER, TUnit::UNIT);
-  collection_items_read_counter_ =
-      ADD_COUNTER(runtime_profile(), COLLECTION_ITEMS_READ_COUNTER, TUnit::UNIT);
-  total_throughput_counter_ = runtime_profile()->AddRateCounter(
-      TOTAL_THROUGHPUT_COUNTER, bytes_read_counter_);
-  materialize_tuple_timer_ = ADD_CHILD_TIMER(runtime_profile(), MATERIALIZE_TUPLE_TIMER,
-      SCANNER_THREAD_TOTAL_WALLCLOCK_TIME);
+  materialize_tuple_timer_ = ADD_TIMER(runtime_profile(), MATERIALIZE_TUPLE_TIMER);
 
   DCHECK_EQ(filter_exprs_.size(), filter_ctxs_.size());
   for (int i = 0; i < filter_exprs_.size(); ++i) {
     RETURN_IF_ERROR(ScalarExprEvaluator::Create(*filter_exprs_[i], state, pool_,
         expr_perm_pool(), expr_results_pool(), &filter_ctxs_[i].expr_eval));
   }
-
   return Status::OK();
 }
 
+void ScanNode::AddBytesReadCounters() {
+  bytes_read_counter_ =
+      ADD_COUNTER(runtime_profile(), BYTES_READ_COUNTER, TUnit::BYTES);
+  bytes_read_timeseries_counter_ = ADD_TIME_SERIES_COUNTER(runtime_profile(),
+      BYTES_READ_COUNTER, bytes_read_counter_);
+  total_throughput_counter_ = runtime_profile()->AddRateCounter(
+      TOTAL_THROUGHPUT_COUNTER, bytes_read_counter_);
+}
+
 Status ScanNode::Open(RuntimeState* state) {
   RETURN_IF_ERROR(ExecNode::Open(state));
 
@@ -127,7 +138,6 @@ Status ScanNode::Open(RuntimeState* state) {
   for (FilterContext& ctx : filter_ctxs_) {
     RETURN_IF_ERROR(ctx.expr_eval->Open(state));
   }
-
   return Status::OK();
 }
 
@@ -178,4 +188,121 @@ bool ScanNode::WaitForRuntimeFilters() {
   return false;
 }
 
+void ScanNode::ScannerThreadState::Prepare(ScanNode* parent) {
+  RuntimeProfile* profile = parent->runtime_profile();
+  row_batches_mem_tracker_ = parent->runtime_state_->obj_pool()->Add(new MemTracker(
+        -1, "Queued Batches", parent->mem_tracker(), false));
+
+  thread_counters_ = ADD_THREAD_COUNTERS(profile, SCANNER_THREAD_COUNTERS_PREFIX);
+  num_threads_started_ = ADD_COUNTER(profile, NUM_SCANNER_THREADS_STARTED, TUnit::UNIT);
+  row_batches_enqueued_ =
+      ADD_COUNTER(profile, "RowBatchesEnqueued", TUnit::UNIT);
+  row_batch_bytes_enqueued_ =
+      ADD_COUNTER(profile, "RowBatchBytesEnqueued", TUnit::BYTES);
+  row_batches_get_timer_ = ADD_TIMER(profile, "RowBatchQueueGetWaitTime");
+  row_batches_put_timer_ = ADD_TIMER(profile, "RowBatchQueuePutWaitTime");
+  row_batches_max_capacity_ =
+      profile->AddHighWaterMarkCounter("RowBatchQueueCapacity", TUnit::UNIT);
+  row_batches_peak_mem_consumption_ =
+      ADD_COUNTER(profile, "RowBatchQueuePeakMemoryUsage", TUnit::BYTES);
+}
+
+void ScanNode::ScannerThreadState::Open(
+    ScanNode* parent, int64_t max_row_batches_override) {
+  RuntimeState* state = parent->runtime_state_;
+  max_num_scanner_threads_ = CpuInfo::num_cores();
+  if (state->query_options().num_scanner_threads > 0) {
+    max_num_scanner_threads_ = state->query_options().num_scanner_threads;
+  }
+  DCHECK_GT(max_num_scanner_threads_, 0);
+
+  int max_row_batches = max_row_batches_override;
+  if (max_row_batches_override <= 0) {
+    // Legacy heuristic to determine the size, based on the idea that more disks means a
+    // faster producer. Also used for Kudu under the assumption that the scan runs
+    // co-located with a Kudu tablet server and that the tablet server is using disks
+    // similarly as a datanode would.
+    // TODO: IMPALA-7096: re-evaluate this heuristic to get a tighter bound on memory
+    // consumption, we could do something better.
+    max_row_batches = max(1, min(
+        10 * (DiskInfo::num_disks() + io::DiskIoMgr::REMOTE_NUM_DISKS),
+        max_num_scanner_threads_ * FLAGS_max_queued_row_batches_per_scanner_thread));
+  }
+  if (state->query_options().__isset.mt_dop && state->query_options().mt_dop > 0) {
+    // To avoid a significant memory increase when running the multithreaded scans
+    // with mt_dop > 0 (if mt_dop is not supported for this scan), then adjust the number
+    // of maximally queued row batches per scan instance based on MT_DOP. The max
+    // materialized row batches is at least 2 to allow for some parallelism between
+    // the producer/consumer.
+    max_row_batches = max(2, max_row_batches / state->query_options().mt_dop);
+  }
+  VLOG_QUERY << "Max row batch queue size for scan node '" << parent->id()
+      << "' in fragment instance '" << PrintId(state->fragment_instance_id())
+      << "': " << max_row_batches;
+  batch_queue_.reset(new RowBatchQueue(max_row_batches));
+
+  // Start measuring the scanner thread concurrency only once the node is opened.
+  average_concurrency_ = parent->runtime_profile()->AddSamplingCounter(
+      AVERAGE_SCANNER_THREAD_CONCURRENCY, [&num_active=num_active_] () {
+        return num_active.Load();
+      });
+  peak_concurrency_ = parent->runtime_profile()->AddHighWaterMarkCounter(
+      PEAK_SCANNER_THREAD_CONCURRENCY, TUnit::UNIT);
+}
+
+void ScanNode::ScannerThreadState::AddThread(unique_ptr<Thread> thread) {
+  VLOG_RPC << "Thread started: " << thread->name();
+  COUNTER_ADD(num_threads_started_, 1);
+  num_active_.Add(1);
+  peak_concurrency_->Add(1);
+  scanner_threads_.AddThread(move(thread));
+}
+
+bool ScanNode::ScannerThreadState::DecrementNumActive() {
+  peak_concurrency_->Add(-1);
+  return num_active_.Add(-1) == 0;
+}
+
+void ScanNode::ScannerThreadState::EnqueueBatch(
+    unique_ptr<RowBatch> row_batch) {
+  // Only need to count tuple_data_pool() bytes since after IMPALA-5307, no buffers are
+  // returned from the scan node.
+  int64_t bytes = row_batch->tuple_data_pool()->total_reserved_bytes();
+  row_batch->SetMemTracker(row_batches_mem_tracker_);
+  batch_queue_->AddBatch(move(row_batch));
+  COUNTER_ADD(row_batches_enqueued_, 1);
+  COUNTER_ADD(row_batch_bytes_enqueued_, bytes);
+}
+
+bool ScanNode::ScannerThreadState::EnqueueBatchWithTimeout(
+    unique_ptr<RowBatch>* row_batch, int64_t timeout_micros) {
+  // Only need to count tuple_data_pool() bytes since after IMPALA-5307, no buffers are
+  // returned from the scan node.
+  int64_t bytes = (*row_batch)->tuple_data_pool()->total_reserved_bytes();
+  // Transfer memory ownership before enqueueing. If the caller retries, this transfer
+  // is idempotent.
+  (*row_batch)->SetMemTracker(row_batches_mem_tracker_);
+  if (!batch_queue_->BlockingPutWithTimeout(move(*row_batch), timeout_micros)) {
+    return false;
+  }
+  COUNTER_ADD(row_batches_enqueued_, 1);
+  COUNTER_ADD(row_batch_bytes_enqueued_, bytes);
+  return true;
+}
+
+void ScanNode::ScannerThreadState::Shutdown() {
+  if (batch_queue_ != nullptr) batch_queue_->Shutdown();
+}
+
+void ScanNode::ScannerThreadState::Close() {
+  scanner_threads_.JoinAll();
+  DCHECK_EQ(num_active_.Load(), 0) << "There should be no active threads";
+  if (batch_queue_ != nullptr) {
+    row_batches_put_timer_->Set(batch_queue_->total_put_wait_time());
+    row_batches_get_timer_->Set(batch_queue_->total_get_wait_time());
+    row_batches_peak_mem_consumption_->Set(row_batches_mem_tracker_->peak_consumption());
+    batch_queue_->Cleanup();
+  }
+  if (row_batches_mem_tracker_ != nullptr) row_batches_mem_tracker_->Close();
+}
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/5d672457/be/src/exec/scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scan-node.h b/be/src/exec/scan-node.h
index 87d37c2..63bb59b 100644
--- a/be/src/exec/scan-node.h
+++ b/be/src/exec/scan-node.h
@@ -23,67 +23,80 @@
 #include "exec/exec-node.h"
 #include "exec/filter-context.h"
 #include "util/runtime-profile.h"
+#include "util/thread.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
 
 namespace impala {
 
 class TScanRange;
 
-/// Abstract base class of all scan nodes; introduces SetScanRange().
-//
+/// Abstract base class of all scan nodes. Subclasses support different storage layers
+/// and different threading models.
+///
 /// Includes ScanNode common counters:
-///   BytesRead - total bytes read by this scan node. Provided as a counter as well
-///     as a time series that samples the counter.
-//
-///   TotalRawReadTime - it measures the total time spent in underlying reads.
-///     For HDFS files, this is the time in the disk-io-mgr's reading threads for
-///     this node. For example, if we have 3 reading threads and each spent
-///     1 sec, this counter will report 3 sec.
-///     For HBase, this is the time spent in the region server.
-//
-///   TotalReadThroughput - BytesRead divided by the total time spent in this node
-///     (from Open to Close). For IO bounded queries, this should be very close to the
-///     total throughput of all the disks.
-//
-///   PerDiskRawHdfsThroughput - the read throughput for each disk. If all the data reside
-///     on disk, this should be the read throughput the disk, regardless of whether the
-///     query is IO bounded or not.
-//
-///   NumDisksAccessed - number of disks accessed.
-//
-///   AverageScannerThreadConcurrency - the average number of active scanner threads. A
-///     scanner thread is considered active if it is not blocked by IO. This number would
-///     be low (less than 1) for IO-bound queries. For cpu-bound queries, this number
-///     would be close to the max scanner threads allowed.
-//
-///   AverageHdfsReadThreadConcurrency - the average number of active hdfs reading threads
-///     reading for this scan node. For IO bound queries, this should be close to the
-///     number of disk.
-//
-///   Hdfs Read Thread Concurrency Bucket - the bucket counting (%) of hdfs read thread
-///     concurrency.
-//
+///   BytesRead - total bytes read from disk by this scan node. Provided as a counter
+///     as well as a time series that samples the counter. Only implemented for scan node
+///     subclasses that expose the bytes read, e.g. HDFS and HBase.
+///
+///   TotalReadThroughput - BytesRead divided by the total wall clock time that this scan
+///     was executing (from Open() to Close()). This gives the aggregate rate that data
+///     is read from disks. If this is the only scan executing, ideally this will
+///     approach the maximum bandwidth supported by the disks.
+///
+///   RowsRead - number of top-level rows/tuples read from the storage layer, including
+///     those discarded by predicate evaluation. Used for all types of scans.
+///
+///   CollectionItemsRead - total number of nested collection items read by the scan.
+///     Only created for scans (e.g. Parquet) that support nested types.
+///
+///   ScanRangesComplete - number of scan ranges completed. Initialized for scans that
+///     have a concept of "scan range".
+///
+///   MaterializeTupleTime - wall clock time spent materializing tuples and evaluating
+///     predicates.
+///
+/// The following counters are specific to multithreaded scan node implementations:
+///
+///   PeakScannerThreadConcurrency - the peak number of scanner threads executing at any
+///     one time. Present only for multithreaded scan nodes.
+///
+///   AverageScannerThreadConcurrency - the average number of scanner threads executing
+///     between Open() and the time when the scan completes. Present only for
+///     multithreaded scan nodes.
+///
 ///   NumScannerThreadsStarted - the number of scanner threads started for the duration
 ///     of the ScanNode. This is at most the number of scan ranges but should be much
 ///     less since a single scanner thread will likely process multiple scan ranges.
-//
-///   ScanRangesComplete - number of scan ranges completed
-//
-///   MaterializeTupleTime - time spent in creating in-memory tuple format
-//
-///   ScannerThreadsTotalWallClockTime - total time spent in all scanner threads.
-//
+///     This is *not* the same as peak scanner thread concurrency because the number of
+///     scanner threads can fluctuate during execution of the scan.
+///
+///   ScannerThreadsTotalWallClockTime - total wall clock time spent in all scanner
+///     threads.
+///
 ///   ScannerThreadsUserTime, ScannerThreadsSysTime,
 ///   ScannerThreadsVoluntaryContextSwitches, ScannerThreadsInvoluntaryContextSwitches -
 ///     these are aggregated counters across all scanner threads of this scan node. They
 ///     are taken from getrusage. See RuntimeProfile::ThreadCounters for details.
-//
+///
+///   RowBatchesEnqueued, RowBatchBytesEnqueued - Number of row batches and bytes enqueued
+///     in the scan node's output queue.
+///
+///   RowBatchQueueGetWaitTime - Wall clock time that the fragment execution thread spent
+///     blocked waiting for row batches to be added to the scan node's output queue.
+///
+///   RowBatchQueuePutWaitTime - Wall clock time that the scanner threads spent blocked
+///     waiting for space in the scan node's output queue when it is full.
+///
+///   RowBatchQueueCapacity - capacity in batches of the scan node's output queue.
+///
+///   RowBatchQueuePeakMemoryUsage - peak memory consumption of row batches enqueued in
+///     the scan node's output queue.
+///
 class ScanNode : public ExecNode {
  public:
   ScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
     : ExecNode(pool, tnode, descs),
-      scan_range_params_(NULL),
-      active_scanner_thread_counter_(TUnit::UNIT, 0) {}
+      scan_range_params_(NULL) {}
 
   virtual Status Init(const TPlanNode& tnode, RuntimeState* state) WARN_UNUSED_RESULT;
   virtual Status Prepare(RuntimeState* state) WARN_UNUSED_RESULT;
@@ -107,29 +120,9 @@ class ScanNode : public ExecNode {
   RuntimeProfile::Counter* collection_items_read_counter() const {
     return collection_items_read_counter_;
   }
-  RuntimeProfile::Counter* read_timer() const { return read_timer_; }
-  RuntimeProfile::Counter* open_file_timer() const { return open_file_timer_; }
-  RuntimeProfile::Counter* total_throughput_counter() const {
-    return total_throughput_counter_;
-  }
-  RuntimeProfile::Counter* per_read_thread_throughput_counter() const {
-    return per_read_thread_throughput_counter_;
-  }
   RuntimeProfile::Counter* materialize_tuple_timer() const {
     return materialize_tuple_timer_;
   }
-  RuntimeProfile::Counter* scan_ranges_complete_counter() const {
-    return scan_ranges_complete_counter_;
-  }
-  RuntimeProfile::ThreadCounters* scanner_thread_counters() const {
-    return scanner_thread_counters_;
-  }
-  RuntimeProfile::Counter& active_scanner_thread_counter() {
-    return active_scanner_thread_counter_;
-  }
-  RuntimeProfile::Counter* average_scanner_thread_concurrency() const {
-    return average_scanner_thread_concurrency_;
-  }
 
   /// names of ScanNode common counters
   static const std::string BYTES_READ_COUNTER;
@@ -146,6 +139,7 @@ class ScanNode : public ExecNode {
   static const std::string SCANNER_THREAD_COUNTERS_PREFIX;
   static const std::string SCANNER_THREAD_TOTAL_WALLCLOCK_TIME;
   static const std::string AVERAGE_SCANNER_THREAD_CONCURRENCY;
+  static const std::string PEAK_SCANNER_THREAD_CONCURRENCY;
   static const std::string AVERAGE_HDFS_READ_THREAD_CONCURRENCY;
   static const std::string NUM_SCANNER_THREADS_STARTED;
 
@@ -159,38 +153,33 @@ class ScanNode : public ExecNode {
   /// The scan ranges this scan node is responsible for. Not owned.
   const std::vector<TScanRangeParams>* scan_range_params_;
 
-  RuntimeProfile::Counter* bytes_read_counter_; // # bytes read from the scanner
-  /// Time series of the bytes_read_counter_
-  RuntimeProfile::TimeSeriesCounter* bytes_read_timeseries_counter_;
-  /// # top-level rows/tuples read from the scanner
-  /// (including those discarded by EvalConjucts())
-  RuntimeProfile::Counter* rows_read_counter_;
+  /// Total bytes read from the scanner. Initialised in subclasses that track
+  /// bytes read, including HDFS and HBase by calling AddBytesReadCounters().
+  RuntimeProfile::Counter* bytes_read_counter_ = nullptr;
+
+  /// Time series of 'bytes_read_counter_', initialized at the same time.
+  RuntimeProfile::TimeSeriesCounter* bytes_read_timeseries_counter_ = nullptr;
+
+  /// Wall based aggregate read throughput [bytes/sec]. Depends on 'bytes_read_counter_'
+  /// and initialized at the same time.
+  RuntimeProfile::Counter* total_throughput_counter_ = nullptr;
+
+  /// # top-level rows/tuples read from the scanner, including those discarded by
+  /// EvalConjuncts(). Used for all types of scans.
+  RuntimeProfile::Counter* rows_read_counter_ = nullptr;
+
   /// # items the scanner read into CollectionValues. For example, for schema
   /// array<struct<B: INT, array<C: INT>> and tuple
   /// [(2, [(3)]), (4, [])] this counter will be 3: (2, [(3)]), (3) and (4, [])
-  RuntimeProfile::Counter* collection_items_read_counter_;
-  RuntimeProfile::Counter* read_timer_; // total read time
-  RuntimeProfile::Counter* open_file_timer_; // total time spent opening file handles
-  /// Wall based aggregate read throughput [bytes/sec]
-  RuntimeProfile::Counter* total_throughput_counter_;
-  /// Per thread read throughput [bytes/sec]
-  RuntimeProfile::Counter* per_read_thread_throughput_counter_;
-  RuntimeProfile::Counter* num_disks_accessed_counter_;
-  RuntimeProfile::Counter* materialize_tuple_timer_;  // time writing tuple slots
-  RuntimeProfile::Counter* scan_ranges_complete_counter_;
-  /// Aggregated scanner thread counters
-  RuntimeProfile::ThreadCounters* scanner_thread_counters_;
-
-  /// The number of scanner threads currently running.
-  RuntimeProfile::Counter active_scanner_thread_counter_;
-
-  /// Average number of active scanner threads
-  /// This should be created in Open and stopped when all the scanner threads are done.
-  RuntimeProfile::Counter* average_scanner_thread_concurrency_;
-
-  /// Cumulative number of scanner threads created during the scan. Some may be created
-  /// and then destroyed, so this can exceed the peak number of threads.
-  RuntimeProfile::Counter* num_scanner_threads_started_counter_;
+  /// Initialized by subclasses that support scanning nested types.
+  RuntimeProfile::Counter* collection_items_read_counter_ = nullptr;
+
+  /// Total time writing tuple slots. Used for all types of scans.
+  RuntimeProfile::Counter* materialize_tuple_timer_ = nullptr;
+
+  /// Total number of scan ranges completed. Initialised in subclasses that have a
+  /// concept of "scan range", including HDFS and Kudu.
+  RuntimeProfile::Counter* scan_ranges_complete_counter_ = nullptr;
 
   /// Expressions to evaluate the input rows for filtering against runtime filters.
   std::vector<ScalarExpr*> filter_exprs_;
@@ -200,13 +189,126 @@ class ScanNode : public ExecNode {
   /// the per-scanner ScannerContext. Correspond to exprs in 'filter_exprs_'.
   std::vector<FilterContext> filter_ctxs_;
 
+  /// Initializes 'bytes_read_counter_', 'bytes_read_timeseries_counter_' and
+  /// 'total_throughput_counter_'
+  void AddBytesReadCounters();
+
   /// Waits for runtime filters to arrive, checking every 20ms. Max wait time is specified
   /// by the 'runtime_filter_wait_time_ms' flag, which is overridden by the query option
   /// of the same name. Returns true if all filters arrived within the time limit (as
   /// measured from the time of RuntimeFilterBank::RegisterFilter()), false otherwise.
   bool WaitForRuntimeFilters();
-};
 
+  /// Additional state only used by multi-threaded scan node implementations.
+  /// The lifecycle is as follows:
+  /// 1. Prepare() is called.
+  /// 2. Open() is called.
+  /// 3. Other methods can be called.
+  /// 4. Shutdown() is called to prevent new batches being added to the queue.
+  /// 5. Close() is called to release all resources.
+  class ScannerThreadState {
+   public:
+    /// Called from *ScanNode::Prepare() to initialize counters and MemTracker.
+    void Prepare(ScanNode* parent);
+
+    /// Called from *ScanNode::Open() to create the row batch queue and start periodic
+    /// counters running. 'max_row_batches_override' determines size of the row batch
+    /// queue if >= 0. Otherwise the size is automatically determined.
+    void Open(ScanNode* parent, int64_t max_row_batches_override);
+
+    /// Called when no more batches need to be enqueued or dequeued. Shuts down the
+    /// queue. Thread-safe.
+    void Shutdown();
+
+    /// Waits for all scanner threads to finish and cleans up the queue. Called from
+    /// *ScanNode::Close(). No other methods can be called after this. Not thread-safe.
+    void Close();
+
+    /// Add a new scanner thread to the thread group. Not thread-safe: only one thread
+    /// should call AddThread() at a time.
+    void AddThread(std::unique_ptr<Thread> thread);
+
+    /// Get the number of active scanner threads. Thread-safe.
+    int32_t GetNumActive() const { return num_active_.Load(); }
+
+    /// Get the number of started scanner threads. Thread-safe.
+    int32_t GetNumStarted() const { return num_threads_started_->value(); }
+
+    /// Called from a scanner thread that is exiting to decrement the number of active
+    /// scanner threads. Returns true if this was the last thread to exit. Thread-safe.
+    bool DecrementNumActive();
+
+    /// Adds a materialized row batch for the scan node.  This is called from scanner
+    /// threads. This function will block if the row batch queue is full. Thread-safe.
+    void EnqueueBatch(std::unique_ptr<RowBatch> row_batch);
+
+    /// Adds a materialized row batch for the scan node. This is called from scanner
+    /// threads. This function will block for up to timeout_micros if the row batch
+    /// queue is full. Return true and takes ownership of '*row_batch' if the batch
+    /// was successfully enqueued. Returns false if the timeout expired or the queue
+    /// was shut down and the batch could not be enqueued and does not take ownership
+    /// of '*row_batch'. Thread-safe.
+    bool EnqueueBatchWithTimeout(std::unique_ptr<RowBatch>* row_batch,
+        int64_t timeout_micros);
+
+    RowBatchQueue* batch_queue() { return batch_queue_.get(); }
+    RuntimeProfile::ThreadCounters* thread_counters() const { return thread_counters_; }
+    int max_num_scanner_threads() const { return max_num_scanner_threads_; }
+
+   private:
+    /// Thread group for all scanner threads.
+    ThreadGroup scanner_threads_;
+
+    /// Maximum number of scanner threads. Set to 'NUM_SCANNER_THREADS' if that query
+    /// option is set. Otherwise, it's set to the number of cpu cores. Scanner threads
+    /// are generally cpu bound so there is no benefit in spinning up more threads than
+    /// the number of cores. Set in Open().
+    int max_num_scanner_threads_ = 0;
+
+    // MemTracker for queued row batches. Initialized in Prepare(). Owned by RuntimeState.
+    MemTracker* row_batches_mem_tracker_ = nullptr;
+
+    /// Outgoing row batches queue. Row batches are produced asynchronously by the scanner
+    /// threads and consumed by the main fragment thread that calls GetNext() on the scan
+    /// node.
+    boost::scoped_ptr<RowBatchQueue> batch_queue_;
+
+    /// The number of scanner threads currently running.
+    AtomicInt32 num_active_{0};
+
+    /// Aggregated scanner thread CPU time counters.
+    RuntimeProfile::ThreadCounters* thread_counters_ = nullptr;
+
+    /// Average number of executing scanner threads
+    /// This should be created in Open and stopped when all the scanner threads are done.
+    RuntimeProfile::Counter* average_concurrency_ = nullptr;
+
+    /// Peak number of executing scanner threads.
+    RuntimeProfile::HighWaterMarkCounter* peak_concurrency_ = nullptr;
+
+    /// Cumulative number of scanner threads created during the scan. Some may be created
+    /// and then destroyed, so this can exceed the peak number of threads.
+    RuntimeProfile::Counter* num_threads_started_ = nullptr;
+
+    /// The number of row batches enqueued into the row batch queue.
+    RuntimeProfile::Counter* row_batches_enqueued_ = nullptr;
+
+    /// The total bytes of row batches enqueued into the row batch queue.
+    RuntimeProfile::Counter* row_batch_bytes_enqueued_ = nullptr;
+
+    /// The wait time for fetching a row batch from the row batch queue.
+    RuntimeProfile::Counter* row_batches_get_timer_ = nullptr;
+
+    /// The wait time for enqueuing a row batch into the row batch queue.
+    RuntimeProfile::Counter* row_batches_put_timer_ = nullptr;
+
+    /// Maximum capacity of the row batch queue.
+    RuntimeProfile::HighWaterMarkCounter* row_batches_max_capacity_ = nullptr;
+
+    /// Peak memory consumption of the materialized batch queue. Updated in Close().
+    RuntimeProfile::Counter* row_batches_peak_mem_consumption_ = nullptr;
+  };
+};
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/impala/blob/5d672457/be/src/runtime/fragment-instance-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index e6fa489..af03a2b 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -29,7 +29,7 @@
 #include "codegen/llvm-codegen.h"
 #include "exec/plan-root-sink.h"
 #include "exec/exec-node.h"
-#include "exec/hdfs-scan-node-base.h"  // for PerVolumeStats
+#include "exec/hdfs-scan-node-base.h"
 #include "exec/exchange-node.h"
 #include "exec/scan-node.h"
 #include "runtime/exec-env.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/5d672457/be/src/util/blocking-queue.h
----------------------------------------------------------------------
diff --git a/be/src/util/blocking-queue.h b/be/src/util/blocking-queue.h
index 1dd90d5..8b07dc5 100644
--- a/be/src/util/blocking-queue.h
+++ b/be/src/util/blocking-queue.h
@@ -178,6 +178,11 @@ class BlockingQueue : public CacheLineAligned {
     return SizeLocked(write_lock);
   }
 
+  bool AtCapacity() const {
+    boost::unique_lock<boost::mutex> write_lock(put_lock_);
+    return SizeLocked(write_lock) >= max_elements_;
+  }
+
   int64_t total_get_wait_time() const {
     // Hold lock to make sure the value read is consistent (i.e. no torn read).
     boost::lock_guard<boost::mutex> read_lock(get_lock_);

http://git-wip-us.apache.org/repos/asf/impala/blob/5d672457/be/src/util/thread.h
----------------------------------------------------------------------
diff --git a/be/src/util/thread.h b/be/src/util/thread.h
index b114e26..a76932c 100644
--- a/be/src/util/thread.h
+++ b/be/src/util/thread.h
@@ -127,6 +127,8 @@ class Thread {
   /// support retrieving the tid, returns Thread::INVALID_THREAD_ID.
   int64_t tid() const { return tid_; }
 
+  const std::string& name() const { return name_; }
+
   static const int64_t INVALID_THREAD_ID = -1;
 
  private: