You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2016/08/31 07:04:41 UTC

[1/2] incubator-impala git commit: IMPALA-3491: Use unique database fixture in test_join_queries.py.

Repository: incubator-impala
Updated Branches:
  refs/heads/master 33a35ea4f -> ecd78fb67


IMPALA-3491: Use unique database fixture in test_join_queries.py.

Testing: Ran the core/exhaustive on hdfs.

Change-Id: Ib639ff8a37dbf64840606f88badff8f2590587b6
Reviewed-on: http://gerrit.cloudera.org:8080/4169
Reviewed-by: Michael Brown <mi...@cloudera.com>
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/df830901
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/df830901
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/df830901

Branch: refs/heads/master
Commit: df830901de416384c684b16a7ea80eb6483c35e3
Parents: 33a35ea
Author: Alex Behm <al...@cloudera.com>
Authored: Mon Aug 29 10:11:14 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Wed Aug 31 03:12:30 2016 +0000

----------------------------------------------------------------------
 .../queries/QueryTest/semi-joins.test           | 96 ++++++++++++--------
 tests/query_test/test_join_queries.py           | 69 +++++---------
 2 files changed, 82 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df830901/testdata/workloads/functional-query/queries/QueryTest/semi-joins.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/semi-joins.test b/testdata/workloads/functional-query/queries/QueryTest/semi-joins.test
index bc49669..9f497b7 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/semi-joins.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/semi-joins.test
@@ -1,7 +1,7 @@
 ====
 ---- QUERY
 # Verification that the setup of SEMIJOIN tables was correct. (SemiJoinTblA)
-SELECT * FROM functional.SemiJoinTblA
+SELECT * FROM SemiJoinTblA
 ---- RESULTS
 1,1,1
 1,1,10
@@ -15,7 +15,7 @@ INT, INT, INT
 ====
 ---- QUERY
 # Verification that the setup of SEMIJOIN tables was correct. (SemiJoinTblB)
-SELECT * FROM functional.SemiJoinTblB
+SELECT * FROM SemiJoinTblB
 ---- RESULTS
 1,1,1
 1,1,10
@@ -29,7 +29,9 @@ INT, INT, INT
 ====
 ---- QUERY
 # Testing left anti join
-select j.* from JoinTbl j left anti join DimTbl d on j.test_id = d.id
+select j.* from functional_parquet.JoinTbl j
+left anti join functional_parquet.DimTbl d
+on j.test_id = d.id
 ---- RESULTS
 1106,'Name6',94612,5000
 1106,'Name16',94612,5000
@@ -44,7 +46,9 @@ bigint, string, int, int
 ====
 ---- QUERY
 # Testing left anti join on string column
-select j.* from JoinTbl j left anti join DimTbl d on j.test_name = d.name
+select j.* from functional_parquet.JoinTbl j
+left anti join functional_parquet.DimTbl d
+on j.test_name = d.name
 ---- RESULTS
 1006,'Name16',94612,5000
 1106,'Name16',94612,5000
@@ -59,8 +63,11 @@ bigint, string, int, int
 ====
 ---- QUERY
 # Testing multi-way joins that include a left anti join
-select count(*) from JoinTbl j left anti join DimTbl d on j.test_id = d.id
-left outer join JoinTbl k on j.test_id = k.test_id
+select count(*) from functional_parquet.JoinTbl j
+left anti join functional_parquet.DimTbl d
+on j.test_id = d.id
+left outer join functional_parquet.JoinTbl k
+on j.test_id = k.test_id
 ---- RESULTS
 64
 ---- TYPES
@@ -68,8 +75,11 @@ BIGINT
 ====
 ---- QUERY
 # Testing multi-way joins that include only left anti joins
-select count(*) from JoinTbl j left anti join DimTbl d on j.test_id = d.id
-left anti join JoinTbl k on j.test_id = k.test_id
+select count(*) from functional_parquet.JoinTbl j
+left anti join functional_parquet.DimTbl d
+on j.test_id = d.id
+left anti join functional_parquet.JoinTbl k
+on j.test_id = k.test_id
 ---- RESULTS
 0
 ---- TYPES
@@ -77,8 +87,8 @@ BIGINT
 ====
 ---- QUERY
 # Regression test for IMPALA-1160. Proper handling of left anti joins with NULLs
-SELECT a.* FROM functional.SemiJoinTblA a
-LEFT ANTI JOIN functional.SemiJoinTblB b ON a.b = b.b
+SELECT a.* FROM SemiJoinTblA a
+LEFT ANTI JOIN SemiJoinTblB b ON a.b = b.b
 ---- RESULTS
 2,4,30
 1,3,10
@@ -89,8 +99,8 @@ INT, INT, INT
 ====
 ---- QUERY
 # Regression test for IMPALA-1175: Anti join query crashes Impala.
-SELECT * FROM functional.SemiJoinTblA A LEFT ANTI JOIN
-(SELECT count(*) `$c$2`, B.b `$c$1` FROM functional.SemiJoinTblB B GROUP BY B.b) `$a$1`
+SELECT * FROM SemiJoinTblA A LEFT ANTI JOIN
+(SELECT count(*) `$c$2`, B.b `$c$1` FROM SemiJoinTblB B GROUP BY B.b) `$a$1`
 ON A.a != `$a$1`.`$c$2` AND `$a$1`.`$c$1` = A.b
 ---- RESULTS
 1,2,10
@@ -104,7 +114,8 @@ INT, INT, INT
 ---- QUERY
 # Regression tests for IMPALA-1177: Incorrect results in query with ANTI JOIN on tinyint
 # column with nulls.
-SELECT COUNT(*) FROM functional.alltypesagg t1 LEFT ANTI JOIN functional.alltypes t2
+SELECT COUNT(*) FROM functional_parquet.alltypesagg t1
+LEFT ANTI JOIN functional_parquet.alltypes t2
 ON t2.tinyint_col = t1.tinyint_col
 ---- RESULTS
 2000
@@ -112,7 +123,8 @@ ON t2.tinyint_col = t1.tinyint_col
 BIGINT
 ====
 ---- QUERY
-SELECT COUNT(*) FROM functional.alltypesagg t1 LEFT ANTI JOIN functional.alltypes t2
+SELECT COUNT(*) FROM functional_parquet.alltypesagg t1
+LEFT ANTI JOIN functional_parquet.alltypes t2
 ON t2.tinyint_col = t1.tinyint_col and t1.day = 1
 ---- RESULTS
 10100
@@ -120,7 +132,8 @@ ON t2.tinyint_col = t1.tinyint_col and t1.day = 1
 BIGINT
 ====
 ---- QUERY
-SELECT COUNT(*) FROM functional.alltypesagg t1 LEFT ANTI JOIN functional.alltypes t2
+SELECT COUNT(*) FROM functional_parquet.alltypesagg t1
+LEFT ANTI JOIN functional_parquet.alltypes t2
 ON t2.tinyint_col = t1.tinyint_col and t2.month = 1
 ---- RESULTS
 2000
@@ -128,11 +141,13 @@ ON t2.tinyint_col = t1.tinyint_col and t2.month = 1
 ---- QUERY
 # Regression test for IMPALA-1204: ANTI JOIN crash running complicated query with right
 # joins.
-SELECT 1 FROM functional.alltypestiny t1 INNER JOIN functional.alltypestiny t2
+SELECT 1 FROM functional_parquet.alltypestiny t1
+INNER JOIN functional_parquet.alltypestiny t2
 ON t2.bigint_col = t1.tinyint_col AND t2.tinyint_col = t1.id
 LEFT ANTI JOIN
 (SELECT 1 `$c$2`, tt6.id `$c$1`
- FROM functional.alltypes tt5 RIGHT OUTER JOIN functional.alltypestiny tt6
+ FROM functional_parquet.alltypes tt5
+ RIGHT OUTER JOIN functional_parquet.alltypestiny tt6
  ON tt6.month = tt5.bigint_col) `$a$1` ON t2.int_col = `$a$1`.`$c$1`
 ---- RESULTS
 ---- TYPES
@@ -140,8 +155,9 @@ TINYINT
 ====
 ---- QUERY
 # left semi-join on bigint
-select d.*
-from DimTbl d left semi join JoinTbl j on (d.id = j.test_id)
+select d.* from functional_parquet.DimTbl d
+left semi join functional_parquet.JoinTbl j
+on (d.id = j.test_id)
 ---- RESULTS
 1001,'Name1',94611
 1002,'Name2',94611
@@ -154,8 +170,9 @@ bigint, string, int
 ====
 ---- QUERY
 # left semi-join on string
-select d.*
-from DimTbl d left semi join JoinTbl j on (j.test_name = d.name)
+select d.* from functional_parquet.DimTbl d
+left semi join functional_parquet.JoinTbl j
+on (j.test_name = d.name)
 ---- RESULTS
 1001,'Name1',94611
 1002,'Name2',94611
@@ -168,8 +185,9 @@ bigint, string, int
 ====
 ---- QUERY
 # left semi-join on int
-select d.*
-from DimTbl d left semi join JoinTbl j on (j.test_zip = d.zip)
+select d.* from functional_parquet.DimTbl d
+left semi join functional_parquet.JoinTbl j
+on (j.test_zip = d.zip)
 ---- RESULTS
 1001,'Name1',94611
 1002,'Name2',94611
@@ -180,8 +198,8 @@ bigint, string, int
 ====
 ---- QUERY
 # Regression test for IMPALA-1249. Left anti join on empty build side.
-SELECT a.* FROM functional.SemiJoinTblA a LEFT ANTI JOIN
-(SELECT b.* FROM functional.SemiJoinTblB b WHERE b.a > 10) v ON a.b = v.b
+SELECT a.* FROM SemiJoinTblA a LEFT ANTI JOIN
+(SELECT b.* FROM SemiJoinTblB b WHERE b.a > 10) v ON a.b = v.b
 ---- RESULTS
 1,1,1
 1,1,10
@@ -195,8 +213,8 @@ INT, INT, INT
 ====
 ---- QUERY
 # Testing right semi join
-select b.* FROM functional.SemiJoinTblA a
-right semi join functional.SemiJoinTblB b on a.b = b.b
+select b.* FROM SemiJoinTblA a
+right semi join SemiJoinTblB b on a.b = b.b
 ---- RESULTS
 1,1,10
 1,1,1
@@ -206,8 +224,8 @@ INT, INT, INT
 ====
 ---- QUERY
 # Testing right semi join with duplicates
-SELECT b.int_col
-FROM functional.tinyinttable a RIGHT SEMI JOIN functional.tinyinttable b
+SELECT b.int_col FROM functional_parquet.tinyinttable a
+RIGHT SEMI JOIN functional_parquet.tinyinttable b
 ON a.int_col % 2 = b.int_col % 2
 ---- RESULTS
 0
@@ -225,8 +243,8 @@ INT
 ====
 ---- QUERY
 # Testing right semi join with duplicates and other conjuncts
-SELECT b.int_col
-FROM functional.tinyinttable a RIGHT SEMI JOIN functional.tinyinttable b
+SELECT b.int_col FROM functional_parquet.tinyinttable a
+RIGHT SEMI JOIN functional_parquet.tinyinttable b
 ON a.int_col % 2 = b.int_col % 2 AND a.int_col + b.int_col > 9
 ---- RESULTS
 1
@@ -243,8 +261,8 @@ INT
 ====
 ---- QUERY
 # Testing right anti joins
-select b.* FROM functional.SemiJoinTblA a
-right anti join functional.SemiJoinTblB b on a.b = b.b
+select b.* FROM SemiJoinTblA a
+right anti join SemiJoinTblB b on a.b = b.b
 ---- RESULTS
 2,10,NULL
 1,NULL,10
@@ -255,8 +273,8 @@ INT, INT, INT
 ====
 ---- QUERY
 # Testing right anti join with duplicates and other conjuncts
-SELECT b.int_col
-FROM functional.tinyinttable a RIGHT ANTI JOIN functional.tinyinttable b
+SELECT b.int_col FROM functional_parquet.tinyinttable a
+RIGHT ANTI JOIN functional_parquet.tinyinttable b
 ON a.int_col % 2 = b.int_col % 2 AND a.int_col + b.int_col > 9
 ---- RESULTS
 0
@@ -265,9 +283,9 @@ INT
 ====
 ---- QUERY
 # Anti joins have a uni-directional value transfer (IMPALA-1249).
-select a.* FROM functional.SemiJoinTblA a
+select a.* FROM SemiJoinTblA a
 left anti join
-  (select * from functional.SemiJoinTblB where b <= 3) b
+  (select * from SemiJoinTblB where b <= 3) b
 on a.b = b.b
 ---- RESULTS
 1,3,10
@@ -315,8 +333,8 @@ INT, INT, INT
 #BIGINT
 #====
 # Testing right anti join with empty probe side.
-SELECT b.* FROM (SELECT a.* from functional.SemiJoinTblA a where a.a > 10) v
-RIGHT ANTI JOIN functional.SemiJoinTblB b on v.b = b.b
+SELECT b.* FROM (SELECT a.* from SemiJoinTblA a where a.a > 10) v
+RIGHT ANTI JOIN SemiJoinTblB b on v.b = b.b
 ---- RESULTS
 1,1,1
 1,1,10

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df830901/tests/query_test/test_join_queries.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_join_queries.py b/tests/query_test/test_join_queries.py
index 14965d8..93fcba5 100644
--- a/tests/query_test/test_join_queries.py
+++ b/tests/query_test/test_join_queries.py
@@ -139,52 +139,33 @@ class TestSemiJoinQueries(ImpalaTestSuite):
       # Cut down on execution time when not running in exhaustive mode.
       cls.TestMatrix.add_constraint(lambda v: v.get_value('batch_size') != 1)
 
-  @classmethod
-  def setup_class(cls):
-    super(TestSemiJoinQueries, cls).setup_class()
-    cls.__cleanup_semi_join_tables()
-    cls.__load_semi_join_tables()
-
-  @classmethod
-  def teardown_class(cls):
-    cls.__cleanup_semi_join_tables()
-    super(TestSemiJoinQueries, cls).teardown_class()
-
-  @classmethod
-  def __load_semi_join_tables(cls):
-    SEMIJOIN_TABLES = ['functional.SemiJoinTblA', 'functional.SemiJoinTblB']
-    # Cleanup, create and load fresh test tables for semi/anti-join tests
-    cls.client.execute('create table if not exists '\
-                          'functional.SemiJoinTblA(a int, b int, c int)')
-    cls.client.execute('create table if not exists '\
-                          'functional.SemiJoinTblB(a int, b int, c int)')
-    # loads some values with NULLs in the first table
-    cls.client.execute('insert into %s values(1,1,1)' % SEMIJOIN_TABLES[0]);
-    cls.client.execute('insert into %s values(1,1,10)' % SEMIJOIN_TABLES[0]);
-    cls.client.execute('insert into %s values(1,2,10)' % SEMIJOIN_TABLES[0]);
-    cls.client.execute('insert into %s values(1,3,10)' % SEMIJOIN_TABLES[0]);
-    cls.client.execute('insert into %s values(NULL,NULL,30)'  % SEMIJOIN_TABLES[0]);
-    cls.client.execute('insert into %s values(2,4,30)' % SEMIJOIN_TABLES[0]);
-    cls.client.execute('insert into %s values(2,NULL,20)' % SEMIJOIN_TABLES[0]);
-    # loads some values with NULLs in the second table
-    cls.client.execute('insert into %s values(1,1,1)' % SEMIJOIN_TABLES[1]);
-    cls.client.execute('insert into %s values(1,1,10)' % SEMIJOIN_TABLES[1]);
-    cls.client.execute('insert into %s values(1,2,5)' % SEMIJOIN_TABLES[1]);
-    cls.client.execute('insert into %s values(1,NULL,10)' % SEMIJOIN_TABLES[1]);
-    cls.client.execute('insert into %s values(2,10,NULL)' % SEMIJOIN_TABLES[1]);
-    cls.client.execute('insert into %s values(3,NULL,NULL)' % SEMIJOIN_TABLES[1]);
-    cls.client.execute('insert into %s values(3,NULL,50)' % SEMIJOIN_TABLES[1]);
-
-  @classmethod
-  def __cleanup_semi_join_tables(cls):
-    cls.client.execute('drop table if exists functional.SemiJoinTblA')
-    cls.client.execute('drop table if exists functional.SemiJoinTblB')
-
-  @pytest.mark.execute_serially
-  def test_semi_joins(self, vector):
+  def __load_semi_join_tables(self, db_name):
+    # Create and load fresh test tables for semi/anti-join tests
+    fq_tbl_name_a = '%s.SemiJoinTblA' % db_name
+    self.client.execute('create table %s (a int, b int, c int)' % fq_tbl_name_a)
+    self.client.execute('insert into %s values(1,1,1)' % fq_tbl_name_a);
+    self.client.execute('insert into %s values(1,1,10)' % fq_tbl_name_a);
+    self.client.execute('insert into %s values(1,2,10)' % fq_tbl_name_a);
+    self.client.execute('insert into %s values(1,3,10)' % fq_tbl_name_a);
+    self.client.execute('insert into %s values(NULL,NULL,30)'  % fq_tbl_name_a);
+    self.client.execute('insert into %s values(2,4,30)' % fq_tbl_name_a);
+    self.client.execute('insert into %s values(2,NULL,20)' % fq_tbl_name_a);
+
+    fq_tbl_name_b = '%s.SemiJoinTblB' % db_name
+    self.client.execute('create table %s (a int, b int, c int)' % fq_tbl_name_b)
+    self.client.execute('insert into %s values(1,1,1)' % fq_tbl_name_b);
+    self.client.execute('insert into %s values(1,1,10)' % fq_tbl_name_b);
+    self.client.execute('insert into %s values(1,2,5)' % fq_tbl_name_b);
+    self.client.execute('insert into %s values(1,NULL,10)' % fq_tbl_name_b);
+    self.client.execute('insert into %s values(2,10,NULL)' % fq_tbl_name_b);
+    self.client.execute('insert into %s values(3,NULL,NULL)' % fq_tbl_name_b);
+    self.client.execute('insert into %s values(3,NULL,50)' % fq_tbl_name_b);
+
+  def test_semi_joins(self, vector, unique_database):
     new_vector = copy(vector)
     new_vector.get_value('exec_option')['batch_size'] = vector.get_value('batch_size')
-    self.run_test_case('QueryTest/semi-joins', new_vector)
+    self.__load_semi_join_tables(unique_database)
+    self.run_test_case('QueryTest/semi-joins', new_vector, unique_database)
 
   def test_semi_joins_exhaustive(self, vector):
     if self.exploration_strategy() != 'exhaustive': pytest.skip()


[2/2] incubator-impala git commit: IMPALA-2831: Bound the number of scanner threads per scan node.

Posted by kw...@apache.org.
IMPALA-2831: Bound the number of scanner threads per scan node.

Our current code base allows a scan node to spin up as many as
3x the number of logical cpu cores of scanner threads. However,
the scanner threads are cpu bound so there is diminishing return
for starting more scanner threads than the number of logical cores.
In fact, it may be detrimental due to context switching overhead.

This change bounds the number of scanner threads spun up by a scan
node to the number of logical cpu cores unless the query option
'num_scanner_threads' is set. The total number of available thread
tokens is unchanged. With this change, the peak memory usage of the
following query on a single node Impala cluster running on a machine
with 8 logical cores reduces from 287MB to 101MB.

select count(*) from tpch100_parquet.lineitem where l_orderkey > 20

The reduction comes mostly from the fewer outstanding IO buffers.
The IO for scan ranges will be scheduled by the scanner threads
which pick them up. There will be at least an IO buffer of 8 to 16MB
associated with each scan range. So, more threads we start up,
more memory will be consumed by the IO buffers, leading to the
higher peak memory usages.

Change-Id: I191988ad18d6b4caf892fc967258823edcf9681f
Reviewed-on: http://gerrit.cloudera.org:8080/4174
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/ecd78fb6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/ecd78fb6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/ecd78fb6

Branch: refs/heads/master
Commit: ecd78fb67d240485c96d60e745c674e0157d9263
Parents: df83090
Author: Michael Ho <kw...@cloudera.com>
Authored: Tue Aug 30 10:36:28 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Wed Aug 31 06:58:44 2016 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-scan-node.cc | 29 +++++++++++++++++------------
 be/src/exec/hdfs-scan-node.h  |  6 ++++++
 2 files changed, 23 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ecd78fb6/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 9ed78de..4846004 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -123,7 +123,8 @@ HdfsScanNode::HdfsScanNode(ObjectPool* pool, const TPlanNode& tnode,
       all_ranges_started_(false),
       counters_running_(false),
       thread_avail_cb_id_(-1),
-      rm_callback_id_(-1) {
+      rm_callback_id_(-1),
+      max_num_scanner_threads_(CpuInfo::num_cores()) {
   max_materialized_row_batches_ = FLAGS_max_row_batches;
   if (max_materialized_row_batches_ <= 0) {
     // TODO: This parameter has an U-shaped effect on performance: increasing the value
@@ -259,6 +260,8 @@ Status HdfsScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos
     }
 
     // Issue initial ranges for all file types.
+    RETURN_IF_ERROR(HdfsParquetScanner::IssueInitialRanges(this,
+        matching_per_type_files[THdfsFileFormat::PARQUET]));
     RETURN_IF_ERROR(HdfsTextScanner::IssueInitialRanges(this,
         matching_per_type_files[THdfsFileFormat::TEXT]));
     RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this,
@@ -267,8 +270,6 @@ Status HdfsScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos
         matching_per_type_files[THdfsFileFormat::RC_FILE]));
     RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this,
         matching_per_type_files[THdfsFileFormat::AVRO]));
-    RETURN_IF_ERROR(HdfsParquetScanner::IssueInitialRanges(this,
-        matching_per_type_files[THdfsFileFormat::PARQUET]));
 
     // Release the scanner threads
     ranges_issued_barrier_.Notify();
@@ -730,9 +731,9 @@ Status HdfsScanNode::Open(RuntimeState* state) {
   // reservation before any ranges are issued.
   runtime_state_->resource_pool()->ReserveOptionalTokens(1);
   if (runtime_state_->query_options().num_scanner_threads > 0) {
-    runtime_state_->resource_pool()->set_max_quota(
-        runtime_state_->query_options().num_scanner_threads);
+    max_num_scanner_threads_ = runtime_state_->query_options().num_scanner_threads;
   }
+  DCHECK_GT(max_num_scanner_threads_, 0);
 
   thread_avail_cb_id_ = runtime_state_->resource_pool()->AddThreadAvailableCb(
       bind<void>(mem_fn(&HdfsScanNode::ThreadTokenAvailableCb), this, _1));
@@ -897,7 +898,7 @@ Status HdfsScanNode::AddDiskIoRanges(const vector<DiskIoMgr::ScanRange*>& ranges
       runtime_state_->io_mgr()->AddScanRanges(reader_context_, ranges));
   num_unqueued_files_.Add(-num_files_queued);
   DCHECK_GE(num_unqueued_files_.Load(), 0);
-  ThreadTokenAvailableCb(runtime_state_->resource_pool());
+  if (!ranges.empty()) ThreadTokenAvailableCb(runtime_state_->resource_pool());
   return Status::OK();
 }
 
@@ -983,8 +984,9 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool)
   //  5. Don't start up a ScannerThread if materialized_row_batches_ is full since
   //     we are not scanner bound.
   //  6. Don't start up a thread if there isn't enough memory left to run it.
-  //  7. Don't start up if there are no thread tokens.
-  //  8. Don't start up if we are running too many threads for our vcore allocation
+  //  7. Don't start up more than maximum number of scanner threads configured.
+  //  8. Don't start up if there are no thread tokens.
+  //  9. Don't start up if we are running too many threads for our vcore allocation
   //  (unless the thread is reserved, in which case it has to run).
 
   // Case 4. We have not issued the initial ranges so don't start a scanner thread.
@@ -1000,7 +1002,7 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool)
     unique_lock<mutex> lock(lock_);
     // Cases 1, 2, 3.
     if (done_ || all_ranges_started_ ||
-      active_scanner_thread_counter_.value() >= progress_.remaining()) {
+        active_scanner_thread_counter_.value() >= progress_.remaining()) {
       break;
     }
 
@@ -1011,11 +1013,14 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool)
       break;
     }
 
-    // Case 7.
+    // Case 7 and 8.
     bool is_reserved = false;
-    if (!pool->TryAcquireThreadToken(&is_reserved)) break;
+    if (active_scanner_thread_counter_.value() >= max_num_scanner_threads_ ||
+        !pool->TryAcquireThreadToken(&is_reserved)) {
+      break;
+    }
 
-    // Case 8.
+    // Case 9.
     if (!is_reserved) {
       if (runtime_state_->query_resource_mgr() != NULL &&
           runtime_state_->query_resource_mgr()->IsVcoreOverSubscribed()) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ecd78fb6/be/src/exec/hdfs-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h
index ae38856..36e95b5 100644
--- a/be/src/exec/hdfs-scan-node.h
+++ b/be/src/exec/hdfs-scan-node.h
@@ -509,6 +509,12 @@ class HdfsScanNode : public ScanNode {
   /// -1 if no callback is registered.
   int32_t rm_callback_id_;
 
+  /// Maximum number of scanner threads. Set to 'NUM_SCANNER_THREADS' if that query
+  /// option is set. Otherwise, it's set to the number of cpu cores. Scanner threads
+  /// are generally cpu bound so there is no benefit in spinning up more threads than
+  /// the number of cores.
+  int max_num_scanner_threads_;
+
   /// Tries to spin up as many scanner threads as the quota allows. Called explicitly
   /// (e.g., when adding new ranges) or when threads are available for this scan node.
   void ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool);