You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2016/08/31 07:04:42 UTC

[2/2] incubator-impala git commit: IMPALA-2831: Bound the number of scanner threads per scan node.

IMPALA-2831: Bound the number of scanner threads per scan node.

Our current code base allows a scan node to spin up as many as
3x the number of logical cpu cores of scanner threads. However,
the scanner threads are cpu bound so there is diminishing return
for starting more scanner threads than the number of logical cores.
In fact, it may be detrimental due to context switching overhead.

This change bounds the number of scanner threads spun up by a scan
node to the number of logical cpu cores unless the query option
'num_scanner_threads' is set. The total number of available thread
tokens is unchanged. With this change, the peak memory usage of the
following query on a single node Impala cluster running on a machine
with 8 logical cores reduces from 287MB to 101MB.

select count(*) from tpch100_parquet.lineitem where l_orderkey > 20

The reduction comes mostly from the fewer outstanding IO buffers.
The IO for scan ranges will be scheduled by the scanner threads
which pick them up. There will be at least an IO buffer of 8 to 16MB
associated with each scan range. So, more threads we start up,
more memory will be consumed by the IO buffers, leading to the
higher peak memory usages.

Change-Id: I191988ad18d6b4caf892fc967258823edcf9681f
Reviewed-on: http://gerrit.cloudera.org:8080/4174
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/ecd78fb6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/ecd78fb6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/ecd78fb6

Branch: refs/heads/master
Commit: ecd78fb67d240485c96d60e745c674e0157d9263
Parents: df83090
Author: Michael Ho <kw...@cloudera.com>
Authored: Tue Aug 30 10:36:28 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Wed Aug 31 06:58:44 2016 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-scan-node.cc | 29 +++++++++++++++++------------
 be/src/exec/hdfs-scan-node.h  |  6 ++++++
 2 files changed, 23 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ecd78fb6/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 9ed78de..4846004 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -123,7 +123,8 @@ HdfsScanNode::HdfsScanNode(ObjectPool* pool, const TPlanNode& tnode,
       all_ranges_started_(false),
       counters_running_(false),
       thread_avail_cb_id_(-1),
-      rm_callback_id_(-1) {
+      rm_callback_id_(-1),
+      max_num_scanner_threads_(CpuInfo::num_cores()) {
   max_materialized_row_batches_ = FLAGS_max_row_batches;
   if (max_materialized_row_batches_ <= 0) {
     // TODO: This parameter has an U-shaped effect on performance: increasing the value
@@ -259,6 +260,8 @@ Status HdfsScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos
     }
 
     // Issue initial ranges for all file types.
+    RETURN_IF_ERROR(HdfsParquetScanner::IssueInitialRanges(this,
+        matching_per_type_files[THdfsFileFormat::PARQUET]));
     RETURN_IF_ERROR(HdfsTextScanner::IssueInitialRanges(this,
         matching_per_type_files[THdfsFileFormat::TEXT]));
     RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this,
@@ -267,8 +270,6 @@ Status HdfsScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos
         matching_per_type_files[THdfsFileFormat::RC_FILE]));
     RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this,
         matching_per_type_files[THdfsFileFormat::AVRO]));
-    RETURN_IF_ERROR(HdfsParquetScanner::IssueInitialRanges(this,
-        matching_per_type_files[THdfsFileFormat::PARQUET]));
 
     // Release the scanner threads
     ranges_issued_barrier_.Notify();
@@ -730,9 +731,9 @@ Status HdfsScanNode::Open(RuntimeState* state) {
   // reservation before any ranges are issued.
   runtime_state_->resource_pool()->ReserveOptionalTokens(1);
   if (runtime_state_->query_options().num_scanner_threads > 0) {
-    runtime_state_->resource_pool()->set_max_quota(
-        runtime_state_->query_options().num_scanner_threads);
+    max_num_scanner_threads_ = runtime_state_->query_options().num_scanner_threads;
   }
+  DCHECK_GT(max_num_scanner_threads_, 0);
 
   thread_avail_cb_id_ = runtime_state_->resource_pool()->AddThreadAvailableCb(
       bind<void>(mem_fn(&HdfsScanNode::ThreadTokenAvailableCb), this, _1));
@@ -897,7 +898,7 @@ Status HdfsScanNode::AddDiskIoRanges(const vector<DiskIoMgr::ScanRange*>& ranges
       runtime_state_->io_mgr()->AddScanRanges(reader_context_, ranges));
   num_unqueued_files_.Add(-num_files_queued);
   DCHECK_GE(num_unqueued_files_.Load(), 0);
-  ThreadTokenAvailableCb(runtime_state_->resource_pool());
+  if (!ranges.empty()) ThreadTokenAvailableCb(runtime_state_->resource_pool());
   return Status::OK();
 }
 
@@ -983,8 +984,9 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool)
   //  5. Don't start up a ScannerThread if materialized_row_batches_ is full since
   //     we are not scanner bound.
   //  6. Don't start up a thread if there isn't enough memory left to run it.
-  //  7. Don't start up if there are no thread tokens.
-  //  8. Don't start up if we are running too many threads for our vcore allocation
+  //  7. Don't start up more than maximum number of scanner threads configured.
+  //  8. Don't start up if there are no thread tokens.
+  //  9. Don't start up if we are running too many threads for our vcore allocation
   //  (unless the thread is reserved, in which case it has to run).
 
   // Case 4. We have not issued the initial ranges so don't start a scanner thread.
@@ -1000,7 +1002,7 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool)
     unique_lock<mutex> lock(lock_);
     // Cases 1, 2, 3.
     if (done_ || all_ranges_started_ ||
-      active_scanner_thread_counter_.value() >= progress_.remaining()) {
+        active_scanner_thread_counter_.value() >= progress_.remaining()) {
       break;
     }
 
@@ -1011,11 +1013,14 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool)
       break;
     }
 
-    // Case 7.
+    // Case 7 and 8.
     bool is_reserved = false;
-    if (!pool->TryAcquireThreadToken(&is_reserved)) break;
+    if (active_scanner_thread_counter_.value() >= max_num_scanner_threads_ ||
+        !pool->TryAcquireThreadToken(&is_reserved)) {
+      break;
+    }
 
-    // Case 8.
+    // Case 9.
     if (!is_reserved) {
       if (runtime_state_->query_resource_mgr() != NULL &&
           runtime_state_->query_resource_mgr()->IsVcoreOverSubscribed()) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ecd78fb6/be/src/exec/hdfs-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h
index ae38856..36e95b5 100644
--- a/be/src/exec/hdfs-scan-node.h
+++ b/be/src/exec/hdfs-scan-node.h
@@ -509,6 +509,12 @@ class HdfsScanNode : public ScanNode {
   /// -1 if no callback is registered.
   int32_t rm_callback_id_;
 
+  /// Maximum number of scanner threads. Set to 'NUM_SCANNER_THREADS' if that query
+  /// option is set. Otherwise, it's set to the number of cpu cores. Scanner threads
+  /// are generally cpu bound so there is no benefit in spinning up more threads than
+  /// the number of cores.
+  int max_num_scanner_threads_;
+
   /// Tries to spin up as many scanner threads as the quota allows. Called explicitly
   /// (e.g., when adding new ranges) or when threads are available for this scan node.
   void ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool);