You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by tm...@apache.org on 2018/10/05 21:39:13 UTC

[1/8] impala git commit: IMPALA-7648 part 1: add expected out-of-memory tests

Repository: impala
Updated Branches:
  refs/heads/master 80edf3701 -> 0e1de31ba


IMPALA-7648 part 1: add expected out-of-memory tests

This adds test coverage for some cases where queries are currently
expected to fail with out-of-memory:
* memory limit exceeded in exchange node
* aggregation with large var-len intermediate values
* top N with large limit
* hash join with many duplicates on right side
* analytic with a large window that needs to be buffered in-memory

Note that it's not always totally deterministic where the query hits
'memory limit exceeded' so we don't include the node ID or name in the
expected error message.

Testing:
* ran exhaustive tests
* looped modified tests locally overnight

Change-Id: Icd1a7eb97837b742a967c260cafb5a7f4f45412e
Reviewed-on: http://gerrit.cloudera.org:8080/11564
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/c80c62f2
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/c80c62f2
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/c80c62f2

Branch: refs/heads/master
Commit: c80c62f22d17235774919b5ffcb08a0aea517aea
Parents: 80edf37
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Mon Oct 1 17:17:50 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Fri Oct 5 01:36:17 2018 +0000

----------------------------------------------------------------------
 .../queries/QueryTest/exchange-mem-scaling.test | 39 +++++++++++++++++
 .../QueryTest/spilling-no-debug-action.test     | 44 ++++++++++++++++++++
 tests/query_test/test_mem_usage_scaling.py      | 22 +++++++++-
 3 files changed, 104 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/c80c62f2/testdata/workloads/functional-query/queries/QueryTest/exchange-mem-scaling.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/exchange-mem-scaling.test b/testdata/workloads/functional-query/queries/QueryTest/exchange-mem-scaling.test
new file mode 100644
index 0000000..84ffe34
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/exchange-mem-scaling.test
@@ -0,0 +1,39 @@
+====
+---- QUERY
+# IMPALA-5485: exchange nodes can hit out-of-memory. This query was tuned so that
+# it hits the memory limit in the exchange node when allocating receiver-side
+# buffers. It's also possible but less likely that this will hit a memory limit
+# in the scan nodes.
+set mem_limit=200m;
+set num_scanner_threads=1;
+select *
+from tpch_parquet.lineitem l1
+  join tpch_parquet.lineitem l2 on l1.l_orderkey = l2.l_orderkey and
+      l1.l_partkey = l2.l_partkey and l1.l_suppkey = l2.l_suppkey
+      and l1.l_linenumber = l2.l_linenumber
+order by l1.l_orderkey desc, l1.l_partkey, l1.l_suppkey, l1.l_linenumber
+limit 5
+---- CATCH
+Memory limit exceeded
+====
+---- QUERY
+# Run the above query with a high enough memory limit to succeed on the 3 node
+# minicluster.
+set mem_limit=500m;
+set num_scanner_threads=1;
+select *
+from tpch_parquet.lineitem l1
+  join tpch_parquet.lineitem l2 on l1.l_orderkey = l2.l_orderkey and
+      l1.l_partkey = l2.l_partkey and l1.l_suppkey = l2.l_suppkey
+      and l1.l_linenumber = l2.l_linenumber
+order by l1.l_orderkey desc, l1.l_partkey, l1.l_suppkey, l1.l_linenumber
+limit 5
+---- RESULTS
+6000000,32255,2256,1,5.00,5936.25,0.04,0.03,'N','O','1996-11-02','1996-11-19','1996-12-01','TAKE BACK RETURN','MAIL','carefully ',6000000,32255,2256,1,5.00,5936.25,0.04,0.03,'N','O','1996-11-02','1996-11-19','1996-12-01','TAKE BACK RETURN','MAIL','carefully '
+6000000,96127,6128,2,28.00,31447.36,0.01,0.02,'N','O','1996-09-22','1996-10-01','1996-10-21','NONE','AIR','ooze furiously about the pe',6000000,96127,6128,2,28.00,31447.36,0.01,0.02,'N','O','1996-09-22','1996-10-01','1996-10-21','NONE','AIR','ooze furiously about the pe'
+5999975,6452,1453,2,7.00,9509.15,0.04,0.00,'A','F','1993-11-02','1993-09-23','1993-11-19','DELIVER IN PERSON','SHIP','lar pinto beans aft',5999975,6452,1453,2,7.00,9509.15,0.04,0.00,'A','F','1993-11-02','1993-09-23','1993-11-19','DELIVER IN PERSON','SHIP','lar pinto beans aft'
+5999975,7272,2273,1,32.00,37736.64,0.07,0.01,'R','F','1993-10-07','1993-09-30','1993-10-21','COLLECT COD','REG AIR','tructions. excu',5999975,7272,2273,1,32.00,37736.64,0.07,0.01,'R','F','1993-10-07','1993-09-30','1993-10-21','COLLECT COD','REG AIR','tructions. excu'
+5999975,37131,2138,3,18.00,19226.34,0.04,0.01,'A','F','1993-11-17','1993-08-28','1993-12-08','DELIVER IN PERSON','FOB',', quick deposits. ironic, unusual deposi',5999975,37131,2138,3,18.00,19226.34,0.04,0.01,'A','F','1993-11-17','1993-08-28','1993-12-08','DELIVER IN PERSON','FOB',', quick deposits. ironic, unusual deposi'
+---- TYPES
+BIGINT, BIGINT, BIGINT, INT, DECIMAL, DECIMAL, DECIMAL, DECIMAL, STRING, STRING, STRING, STRING, STRING, STRING, STRING, STRING, BIGINT, BIGINT, BIGINT, INT, DECIMAL, DECIMAL, DECIMAL, DECIMAL, STRING, STRING, STRING, STRING, STRING, STRING, STRING, STRING
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/c80c62f2/testdata/workloads/functional-query/queries/QueryTest/spilling-no-debug-action.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/spilling-no-debug-action.test b/testdata/workloads/functional-query/queries/QueryTest/spilling-no-debug-action.test
index f89e537..539a4fd 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/spilling-no-debug-action.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/spilling-no-debug-action.test
@@ -64,3 +64,47 @@ BIGINT
 ---- RUNTIME_PROFILE
 row_regex: .*NumHashTableBuildsSkipped: .* \([1-9][0-9]*\)
 ====
+---- QUERY
+# Aggregation query that will OOM and fail to spill because of IMPALA-3304 without
+# any help from DEBUG_ACTION.
+set mem_limit=100m;
+select l_orderkey, group_concat(l_comment) comments
+from lineitem
+group by l_orderkey
+order by comments desc
+limit 5
+---- CATCH
+Memory limit exceeded
+====
+---- QUERY
+# Top-N query with large limit that will OOM because spilling is not implemented:
+# IMPALA-3471. It does not need any help from DEBUG_ACTION.
+set mem_limit=100m;
+select *
+from lineitem
+order by l_orderkey desc
+limit 6000000
+---- CATCH
+Memory limit exceeded
+====
+---- QUERY
+# Hash join that will fail to repartition and therefore fail from out-of-memory because
+# of a large number of duplicate keys on the build side: IMPALA-4857. It does not need
+# any help from DEBUG_ACTION.
+set mem_limit=150m;
+select straight_join *
+from supplier join /* +broadcast */ lineitem on s_suppkey = l_linenumber
+order by l_tax desc
+limit 5
+---- CATCH
+row_regex:.*Cannot perform hash join at node with id .*. Repartitioning did not reduce the size of a spilled partition.*
+====
+---- QUERY
+# Analytic query with certain kinds of large windows can't be spilled: IMPALA-5738. It
+# does not need any help from DEBUG_ACTION.
+set mem_limit=100m;
+select avg(l_tax) over (order by l_orderkey rows between 100000000 preceding and 10000000 following)
+from lineitem
+---- CATCH
+Memory limit exceeded
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/c80c62f2/tests/query_test/test_mem_usage_scaling.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_mem_usage_scaling.py b/tests/query_test/test_mem_usage_scaling.py
index 21b320e..69b4f2b 100644
--- a/tests/query_test/test_mem_usage_scaling.py
+++ b/tests/query_test/test_mem_usage_scaling.py
@@ -19,7 +19,8 @@ import pytest
 from copy import copy
 
 from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
-from tests.common.test_dimensions import create_avro_snappy_dimension
+from tests.common.test_dimensions import (create_avro_snappy_dimension,
+    create_parquet_dimension)
 from tests.common.impala_cluster import ImpalaCluster
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.skip import SkipIfNotHdfsMinicluster
@@ -364,3 +365,22 @@ class TestScanMemLimit(ImpalaTestSuite):
     # Remove num_nodes setting to allow .test file to set num_nodes.
     del vector.get_value('exec_option')['num_nodes']
     self.run_test_case('QueryTest/hdfs-scanner-thread-mem-scaling', vector)
+
+
+@SkipIfNotHdfsMinicluster.tuned_for_minicluster
+class TestExchangeMemUsage(ImpalaTestSuite):
+  """Targeted test for exchange memory limits."""
+
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestExchangeMemUsage, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
+    cls.ImpalaTestMatrix.add_dimension(create_parquet_dimension(cls.get_workload()))
+
+  def test_exchange_mem_usage_scaling(self, vector):
+    """Test behaviour of exchange nodes with different memory limits."""
+    self.run_test_case('QueryTest/exchange-mem-scaling', vector)


[4/8] impala git commit: IMPALA-7484: Do not interpret unrecognized hints as straight_join hints.

Posted by tm...@apache.org.
IMPALA-7484: Do not interpret unrecognized hints as
straight_join hints.

Wrapped setIsStraightJoin() in else clause.

Testing: Added a wrapper for testing the state of straight_join hint.
Modified existing tests to use this wrapper for +ve and
-ve test cases.

Change-Id: Icf600ebbfefc7398e0896df143a0ab91545cae04
Reviewed-on: http://gerrit.cloudera.org:8080/11568
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/3a37c724
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/3a37c724
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/3a37c724

Branch: refs/heads/master
Commit: 3a37c724d4940074421745c54eee4d5c29d47c85
Parents: fc91e70
Author: Anurag Mantripragada <an...@gmail.com>
Authored: Thu Oct 4 13:13:53 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Fri Oct 5 06:03:39 2018 +0000

----------------------------------------------------------------------
 .../org/apache/impala/analysis/SelectList.java  |  7 +++--
 .../impala/analysis/AnalyzeStmtsTest.java       | 31 ++++++++++++++------
 2 files changed, 26 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/3a37c724/fe/src/main/java/org/apache/impala/analysis/SelectList.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/SelectList.java b/fe/src/main/java/org/apache/impala/analysis/SelectList.java
index 9794f59..a38774a 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SelectList.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SelectList.java
@@ -82,11 +82,12 @@ public class SelectList {
   public boolean hasPlanHints() { return !planHints_.isEmpty(); }
 
   public void analyzePlanHints(Analyzer analyzer) {
-    for (PlanHint hint: planHints_) {
-      if (!hint.is("straight_join")) {
+    for (PlanHint hint : planHints_) {
+      if (hint.is("straight_join")) {
+        analyzer.setIsStraightJoin();
+      } else {
         analyzer.addWarning("PLAN hint not recognized: " + hint);
       }
-      analyzer.setIsStraightJoin();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/3a37c724/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
index c508409..d36b97d 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
@@ -1822,25 +1822,38 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
     }
   }
 
+  /**
+  * Checks warning message if applicable and
+  * returns true if straight_join hint is applied or false otherwise.
+  */
+  private boolean hasStraightJoin(String stmt, String expectedWarning){
+    AnalysisContext ctx = createAnalysisCtx();
+    AnalyzesOk(stmt,ctx, expectedWarning);
+    return ctx.getAnalyzer().isStraightJoin();
+  }
+
   @Test
   public void TestSelectListHints() throws AnalysisException {
     for (String[] hintStyle: hintStyles_) {
       String prefix = hintStyle[0];
       String suffix = hintStyle[1];
-      AnalyzesOk(String.format(
-          "select %sstraight_join%s * from functional.alltypes", prefix, suffix));
-      AnalyzesOk(String.format(
-          "select %sStrAigHt_jOiN%s * from functional.alltypes", prefix, suffix));
+      assertTrue(hasStraightJoin(String.format(
+          "select %sstraight_join%s * from functional.alltypes", prefix, suffix), null));
+      assertTrue(hasStraightJoin(String.format(
+          "select %sStrAigHt_jOiN%s * from functional.alltypes", prefix, suffix), null));
       if (!prefix.equals("")) {
         // Only warn on unrecognized hints for view-compatibility with Hive.
         // Legacy hint style does not parse.
-        AnalyzesOk(String.format(
+        assertFalse(hasStraightJoin(String.format(
             "select %sbadhint%s * from functional.alltypes", prefix, suffix),
-            "PLAN hint not recognized: badhint");
+            "PLAN hint not recognized: badhint"));
+        assertTrue(hasStraightJoin(String.format(
+             "select %sstraight_join%s * from functional.alltypes", prefix, suffix),
+             null));
         // Multiple hints. Legacy hint style does not parse.
-        AnalyzesOk(String.format(
-            "select %sstraight_join,straight_join%s * from functional.alltypes",
-            prefix, suffix));
+        assertTrue(hasStraightJoin(String.format(
+             "select %sstraight_join,straight_join%s * from functional.alltypes",
+              prefix, suffix), null));
       }
     }
   }


[2/8] impala git commit: IMPALA-7349: Add Admission control support for automatically setting per host memory limit for a query

Posted by tm...@apache.org.
http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/testdata/workloads/functional-query/queries/QueryTest/admission-max-min-mem-limits.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/admission-max-min-mem-limits.test b/testdata/workloads/functional-query/queries/QueryTest/admission-max-min-mem-limits.test
new file mode 100644
index 0000000..8cd0d92
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/admission-max-min-mem-limits.test
@@ -0,0 +1,153 @@
+====
+---- QUERY
+# All queries in this file are run with num_nodes=1 by default unless specified.
+############################
+# No mem_limit set
+# check if mem_admitted is same as mem_estimate
+set request_pool=regularPool;
+# set this to make estimates deterministic.
+set num_scanner_threads=2;
+select * from (select * from functional_parquet.alltypes limit 10) A,
+ (select * from functional_parquet.alltypes limit 10) B;
+---- RUNTIME_PROFILE
+row_regex: .*Per-Host Resource Estimates: Memory=64MB.*
+row_regex: .*Cluster Memory Admitted: 64.00 MB.*
+====
+---- QUERY
+# No mem_limit set
+# lower bound enforced based on largest min_reservation (32.09 MB for this query)
+set request_pool=poolLowMinLimit;
+select * from functional_parquet.alltypes limit 1;
+---- RUNTIME_PROFILE
+row_regex: .*Per-Host Resource Estimates: Memory=16MB.*
+row_regex: .*Cluster Memory Admitted: 32.09 MB.*
+====
+---- QUERY
+# No mem_limit set
+# lower bound enforced based on largest min_reservation (32.09 MB for this query) but the
+# upper bound enforced by pool.max_query_mem_limit takes precedence that cause the query
+# to be rejected eventually.
+set request_pool=poolLowMaxLimit;
+select * from functional_parquet.alltypes limit 1;
+---- CATCH
+Rejected query from pool root.poolLowMaxLimit: minimum memory reservation is greater than
+ memory available to the query for buffer reservations. Memory reservation needed given
+ the current plan: 88.00 KB. Adjust either the mem_limit or the pool config
+ (max-query-mem-limit, min-query-mem-limit) for the query to allow the query
+ memory limit to be at least 32.09 MB. Note that changing the mem_limit may also change
+ the plan. See the query profile for more information about the per-node memory
+ requirements.
+====
+---- QUERY
+# No mem_limit set
+# Upper bound enforced by pool.max_query_mem_limit
+set request_pool=regularPool;
+# set this to make estimates deterministic.
+set num_scanner_threads=2;
+select * from functional_parquet.alltypes A, functional_parquet.alltypes B where
+ A.int_col = B.int_col limit 1;
+---- RUNTIME_PROFILE
+row_regex: .*Per-Host Resource Estimates: Memory=2.06GB.*
+row_regex: .*Cluster Memory Admitted: 1.50 GB.*
+====
+---- QUERY
+# No mem_limit set
+# Lower bound enforced by pool.min_query_mem_limit
+set request_pool=regularPool;
+select 1;
+---- RUNTIME_PROFILE
+row_regex: .*Per-Host Resource Estimates: Memory=10MB.*
+row_regex: .*Cluster Memory Admitted: 50.00 MB.*
+====
+---- QUERY
+############################
+# mem_limit is set
+# check if mem_admitted is same as mem_limit set in query options
+set request_pool=regularPool;
+set mem_limit=200mb;
+select 1;
+---- RUNTIME_PROFILE
+row_regex: .*Cluster Memory Admitted: 200.00 MB.*
+====
+---- QUERY
+# mem_limit is set
+# No lower bound enforced based on largest min_reservation (32.09 MB for this query)
+set request_pool=poolLowMinLimit;
+set mem_limit=27mb;
+select * from functional_parquet.alltypes limit 1;
+---- CATCH
+Rejected query from pool root.poolLowMinLimit: minimum memory reservation is greater than
+ memory available to the query for buffer reservations. Memory reservation needed given
+ the current plan: 88.00 KB. Adjust either the mem_limit or the pool config
+ (max-query-mem-limit, min-query-mem-limit) for the query to allow the query
+ memory limit to be at least 32.09 MB. Note that changing the mem_limit may also change
+ the plan. See the query profile for more information about the per-node memory
+ requirements.
+====
+---- QUERY
+# mem_limit is set and pool.clamp_mem_limit_query_option is true
+# Upper bound using pool.max_query_mem_limit
+set request_pool=regularPool;
+set mem_limit=2G;
+select 1;
+---- RUNTIME_PROFILE
+row_regex: .*Cluster Memory Admitted: 1.50 GB.*
+====
+---- QUERY
+# mem_limit is set and pool.clamp_mem_limit_query_option is true
+# Lower bound using pool.min_query_mem_limit
+set request_pool=regularPool;
+set mem_limit=40mb;
+select 1;
+---- RUNTIME_PROFILE
+row_regex: .*Cluster Memory Admitted: 50.00 MB.*
+====
+---- QUERY
+# mem_limit is set and pool.clamp_mem_limit_query_option is false
+# Upper bound using pool.max_query_mem_limit
+set request_pool=regularPoolWithoutClamping;
+set mem_limit=2G;
+select 1;
+---- RUNTIME_PROFILE
+row_regex: .*Cluster Memory Admitted: 2.00 GB.*
+====
+---- QUERY
+# mem_limit is set and pool.clamp_mem_limit_query_option is false
+# Lower bound using pool.min_query_mem_limit
+set request_pool=regularPoolWithoutClamping;
+set mem_limit=50mb;
+select 1;
+---- RUNTIME_PROFILE
+row_regex: .*Cluster Memory Admitted: 50.00 MB.*
+====
+---- QUERY
+############################
+# Old behaviour: Both pool.max_query_mem_limit and pool.min_query_mem_limit are zero
+# No mem_limit set, check that the mem_estimate is used as mem_admitted and is allowed to
+# run which implies that it passes the check for min mem_limit required based on largest
+# min_reservation because the mem_limit is -1 (since it is not set in query options)
+set request_pool=poolNoMemLimits;
+select 1;
+---- RUNTIME_PROFILE
+row_regex: .*Per-Host Resource Estimates: Memory=10MB.*
+row_regex: .*Cluster Memory Admitted: 10.00 MB.*
+====
+---- QUERY
+############################
+# Invalid pool config
+# min_query_mem_limit is greater than the max_query_mem_limit
+set request_pool=maxLessThanMinLimit;
+Select 1;
+---- CATCH
+Rejected query from pool root.maxLessThanMinLimit: Invalid pool config: the
+ min_query_mem_limit is greater than the max_query_mem_limit (100001 > 100000)
+====
+---- QUERY
+# Invalid pool config
+# min_query_mem_limit is greater than the max_mem_resources
+set request_pool=maxMemLessThanMinLimit;
+Select 1;
+---- CATCH
+Rejected query from pool root.maxMemLessThanMinLimit: Invalid pool config: the
+ min_query_mem_limit is greater than the max_mem_resources (2621440001 > 2621440000)
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/testdata/workloads/functional-query/queries/QueryTest/admission-reject-min-reservation.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/admission-reject-min-reservation.test b/testdata/workloads/functional-query/queries/QueryTest/admission-reject-min-reservation.test
index e658c09..66f9047 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/admission-reject-min-reservation.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/admission-reject-min-reservation.test
@@ -5,7 +5,9 @@ select distinct * from functional_parquet.alltypesagg
 ---- CATCH
 minimum memory reservation is greater than memory available to the
  query for buffer reservations. Memory reservation needed given the
- current plan: 68.09 MB. Set mem_limit to at least 100.09 MB.
+ current plan: 68.09 MB. Adjust either the mem_limit or the pool config
+ (max-query-mem-limit, min-query-mem-limit) for the query to allow the query
+ memory limit to be at least 100.09 MB.
 ====
 ---- QUERY
 set mem_limit=150mb;
@@ -30,7 +32,9 @@ from tpch_parquet.lineitem join tpch_parquet.orders on l_orderkey = o_orderkey
 ---- CATCH
 minimum memory reservation is greater than memory available to the
  query for buffer reservations. Memory reservation needed given the
- current plan: 14.75 MB. Set mem_limit to at least 46.75 MB.
+ current plan: 14.75 MB. Adjust either the mem_limit or the pool config
+ (max-query-mem-limit, min-query-mem-limit) for the query to allow the query
+ memory limit to be at least 46.75 MB.
 ====
 ---- QUERY
 set mem_limit=50mb;
@@ -39,5 +43,7 @@ from tpch_parquet.lineitem join tpch_parquet.orders on l_orderkey = o_orderkey
 ---- CATCH
 minimum memory reservation is greater than memory available to the
  query for buffer reservations. Memory reservation needed given the
- current plan: 26.00 MB. Set mem_limit to at least 58.00 MB.
+ current plan: 26.00 MB. Adjust either the mem_limit or the pool config
+ (max-query-mem-limit, min-query-mem-limit) for the query to allow the query
+ memory limit to be at least 58.00 MB.
 ====

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/tests/common/resource_pool_config.py
----------------------------------------------------------------------
diff --git a/tests/common/resource_pool_config.py b/tests/common/resource_pool_config.py
new file mode 100644
index 0000000..adab034
--- /dev/null
+++ b/tests/common/resource_pool_config.py
@@ -0,0 +1,96 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# Basic helper class for making dynamic changes to the admission controller config files.
+# This is pretty bare-bones at the moment and only contains functionality necessary for
+# the tests it is used for. However, it is generic enough that it can be extended if
+# more functionality is required for adding tests.
+
+import os
+from time import sleep, time
+import xml.etree.ElementTree as ET
+
+
+class ResourcePoolConfig(object):
+
+  # Mapping of config strings used in the llama_site file with those used on the impala
+  # metrics debug page. Add to this dictionary if other configs are need for tests.
+  CONFIG_TO_METRIC_STR_MAPPING = {'max-query-mem-limit': 'pool-max-query-mem-limit'}
+
+  def __init__(self, impala_service, llama_site_path):
+    self.impala_service = impala_service
+    self.llama_site_path = llama_site_path
+    tree = ET.parse(llama_site_path)
+    self.root = tree.getroot()
+
+  def set_config_value(self, pool_name, config_str, target_val):
+    """Sets the value for the config parameter 'config_str' for the 'pool_name'
+    resource pool"""
+    node = self.__find_xml_node(self.root, pool_name, config_str)
+    node.find('value').text = str(target_val)
+    self.__write_xml_to_file(self.root, self.llama_site_path)
+    self.__wait_for_impala_to_pickup_config_change(pool_name, config_str, str(target_val))
+
+  def __wait_for_impala_to_pickup_config_change(
+      self, pool_name, config_str, target_val, timeout=20):
+    """Helper method that constantly sends a query for the 'pool_name' resource pool that
+    will be rejected but as a side effect would initiate a refresh of the pool config.
+    Then on every refresh it checks the pool metric corresponding to 'config_str' to see
+    if impala as picked up the change to that metric and is now equal to the
+    'target'val'. Times out after 'timeout' seconds"""
+    metric_str = self.CONFIG_TO_METRIC_STR_MAPPING[config_str]
+    client = self.impala_service.create_beeswax_client()
+    client.set_configuration_option('request_pool', pool_name)
+    # set mem_limit to something above the proc limit so that the query always gets
+    # rejected.
+    client.set_configuration_option('mem_limit', '10G')
+    metric_key = "admission-controller.{0}.root.{1}".format(metric_str, pool_name)
+    start_time = time()
+    while (time() - start_time < timeout):
+      handle = client.execute_async("select 1")
+      client.close_query(handle)
+      current_val = str(self.impala_service.get_metric_value(metric_key))
+      if current_val == target_val:
+        return
+      sleep(0.1)
+    assert False, "Timed out waiting for {0} to reach {1}. Current: {2}".format(
+      metric_key, target_val, current_val)
+
+  def __write_xml_to_file(self, xml_root, file_name):
+    # Make sure the change to the file is atomic. Write to a temp file and replace the
+    # original with it.
+    temp_path = file_name + "-temp"
+    file_handle = open(temp_path, "w")
+    file_handle.write(ET.tostring(xml_root))
+    file_handle.flush()
+    os.fsync(file_handle.fileno())
+    file_handle.close()
+    os.rename(temp_path, file_name)
+
+  def __find_xml_node(self, xml_root, pool_name, pool_attribute):
+    """Returns the xml node corresponding to the 'pool_attribute' for the 'pool_name'"""
+    for property in xml_root.iter('property'):
+      try:
+        name = property.find('name').text
+        # eg. of name = impala.admission-control.max-query-mem-limit-bytes.root.pool_name
+        if pool_name == name.split('.')[-1] and pool_attribute in name:
+          return property
+      except Exception as e:
+        print "Current DOM element being inspected: \n{0}".format(ET.dump(property))
+        raise e
+    assert False, "{0} attribute not found for pool {1} in the config XML:\n{2}".format(
+      pool_attribute, pool_name, ET.dump(xml_root))

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/tests/custom_cluster/test_admission_controller.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index 2d909d8..24e2ce3 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -22,6 +22,7 @@ import logging
 import os
 import pytest
 import re
+import shutil
 import sys
 import threading
 from copy import copy
@@ -32,10 +33,12 @@ from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.environ import specific_build_type_timeout, IMPALAD_BUILD
 from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.common.resource_pool_config import ResourcePoolConfig
 from tests.common.skip import (
     SkipIfS3,
     SkipIfADLS,
-    SkipIfEC)
+    SkipIfEC,
+    SkipIfNotHdfsMinicluster)
 from tests.common.test_dimensions import (
     create_single_exec_option_dimension,
     create_uncompressed_text_dimension)
@@ -43,7 +46,6 @@ from tests.common.test_vector import ImpalaTestDimension
 from tests.hs2.hs2_test_suite import HS2TestSuite, needs_session
 from ImpalaService import ImpalaHiveServer2Service
 from TCLIService import TCLIService
-from tests.verifiers.metric_verifier import MetricVerifier
 
 LOG = logging.getLogger('admission_test')
 
@@ -129,7 +131,7 @@ MEM_TEST_LIMIT = 12 * 1024 * 1024 * 1024
 
 _STATESTORED_ARGS = ("-statestore_heartbeat_frequency_ms={freq_ms} "
                      "-statestore_priority_update_frequency_ms={freq_ms}").format(
-                    freq_ms=STATESTORE_RPC_FREQUENCY_MS)
+  freq_ms=STATESTORE_RPC_FREQUENCY_MS)
 
 # Name of the subscriber metric tracking the admission control update interval.
 REQUEST_QUEUE_UPDATE_INTERVAL =\
@@ -149,40 +151,55 @@ QUERY_END_TIMEOUT_S = 1
 INITIAL_QUEUE_REASON_REGEX = \
     "Initial admission queue reason: waited [0-9]* ms, reason: .*"
 
+# The path to resources directory which contains the admission control config files.
+RESOURCES_DIR = os.path.join(os.environ['IMPALA_HOME'], "fe", "src", "test", "resources")
+
+
 def impalad_admission_ctrl_flags(max_requests, max_queued, pool_max_mem,
-    proc_mem_limit = None, queue_wait_timeout_ms=None):
+                                 proc_mem_limit=None, queue_wait_timeout_ms=None):
   extra_flags = ""
   if proc_mem_limit is not None:
     extra_flags += " -mem_limit={0}".format(proc_mem_limit)
   if queue_wait_timeout_ms is not None:
     extra_flags += " -queue_wait_timeout_ms={0}".format(queue_wait_timeout_ms)
   return ("-vmodule admission-controller=3 -default_pool_max_requests {0} "
-      "-default_pool_max_queued {1} -default_pool_mem_limit {2} {3}".format(
-      max_requests, max_queued, pool_max_mem, extra_flags))
-
-
-def impalad_admission_ctrl_config_args(additional_args=""):
-  impalad_home = os.environ['IMPALA_HOME']
-  resources_dir = os.path.join(impalad_home, "fe", "src", "test", "resources")
-  fs_allocation_path = os.path.join(resources_dir, "fair-scheduler-test2.xml")
-  llama_site_path = os.path.join(resources_dir, "llama-site-test2.xml")
+          "-default_pool_max_queued {1} -default_pool_mem_limit {2} {3}".format(
+            max_requests, max_queued, pool_max_mem, extra_flags))
+
+
+def impalad_admission_ctrl_config_args(fs_allocation_file, llama_site_file,
+                                        additional_args="", make_copy=False):
+  fs_allocation_path = os.path.join(RESOURCES_DIR, fs_allocation_file)
+  llama_site_path = os.path.join(RESOURCES_DIR, llama_site_file)
+  if make_copy:
+    copy_fs_allocation_path = os.path.join(RESOURCES_DIR, "copy-" + fs_allocation_file)
+    copy_llama_site_path = os.path.join(RESOURCES_DIR, "copy-" + llama_site_file)
+    shutil.copy2(fs_allocation_path, copy_fs_allocation_path)
+    shutil.copy2(llama_site_path, copy_llama_site_path)
+    fs_allocation_path = copy_fs_allocation_path
+    llama_site_path = copy_llama_site_path
   return ("-vmodule admission-controller=3 -fair_scheduler_allocation_path %s "
-        "-llama_site_path %s %s" % (fs_allocation_path, llama_site_path, additional_args))
+          "-llama_site_path %s %s" % (fs_allocation_path, llama_site_path,
+                                      additional_args))
+
 
 def log_metrics(log_prefix, metrics):
-  LOG.info("%sadmitted=%s, queued=%s, dequeued=%s, rejected=%s, "\
+  LOG.info("%sadmitted=%s, queued=%s, dequeued=%s, rejected=%s, "
       "released=%s, timed-out=%s", log_prefix, metrics['admitted'], metrics['queued'],
       metrics['dequeued'], metrics['rejected'], metrics['released'],
       metrics['timed-out'])
 
+
 def compute_metric_deltas(m2, m1):
   """Returns a dictionary of the differences of metrics in m2 and m1 (m2 - m1)"""
   return dict((n, m2.get(n, 0) - m1.get(n, 0)) for n in m2.keys())
 
+
 def metric_key(pool_name, metric_name):
   """Helper method to construct the admission controller metric keys"""
   return "admission-controller.%s.%s" % (metric_name, pool_name)
 
+
 class TestAdmissionControllerBase(CustomClusterTestSuite):
   @classmethod
   def get_workload(self):
@@ -196,6 +213,7 @@ class TestAdmissionControllerBase(CustomClusterTestSuite):
     cls.ImpalaTestMatrix.add_dimension(
       create_uncompressed_text_dimension(cls.get_workload()))
 
+
 class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
   def __check_pool_rejected(self, client, pool, expected_error_re):
     try:
@@ -227,7 +245,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     execute_statement_req.sessionHandle = self.session_handle
     execute_statement_req.confOverlay = {'request_pool': pool_name}
     if mem_limit is not None: execute_statement_req.confOverlay['mem_limit'] = mem_limit
-    execute_statement_req.statement = "select 1";
+    execute_statement_req.statement = "select 1"
     execute_statement_resp = self.hs2_client.ExecuteStatement(execute_statement_req)
     HS2TestSuite.check_response(execute_statement_resp)
 
@@ -262,7 +280,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
         handles.append(client.execute_async(query))
       for query, handle in zip(queries, handles):
         self._wait_for_state(client, handle, client.QUERY_STATES['FINISHED'], timeout_s)
-        results = self.client.fetch(query, handle)
+        self.client.fetch(query, handle)
         profiles.append(self.client.get_runtime_profile(handle))
       return profiles
     finally:
@@ -279,7 +297,9 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-      impalad_args=impalad_admission_ctrl_config_args(),
+      impalad_args=impalad_admission_ctrl_config_args(
+        fs_allocation_file="fair-scheduler-test2.xml",
+        llama_site_file="llama-site-test2.xml"),
       default_query_options=[('mem_limit', 200000000)],
       statestored_args=_STATESTORED_ARGS)
   @needs_session(conf_overlay={'batch_size': '100'})
@@ -290,7 +310,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     impalad = self.cluster.impalads[0]
     client = impalad.service.create_beeswax_client()
     # Expected default mem limit for queueA, used in several tests below
-    queueA_mem_limit = "MEM_LIMIT=%s" % (128*1024*1024)
+    queueA_mem_limit = "MEM_LIMIT=%s" % (128 * 1024 * 1024)
     try:
       for pool in ['', 'not_a_pool_name']:
         expected_error =\
@@ -309,7 +329,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
       # Query should execute in queueB which doesn't have a default mem limit set in the
       # llama-site.xml, so it should inherit the value from the default process query
       # options.
-      self.__check_query_options(result.runtime_profile,\
+      self.__check_query_options(result.runtime_profile,
           ['MEM_LIMIT=200000000', 'REQUEST_POOL=root.queueB'])
 
       # Try setting the pool for a queue with a very low queue timeout.
@@ -320,7 +340,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
       self.__check_pool_rejected(client, 'root.queueA', "exceeded timeout")
       assert client.get_state(handle) == client.QUERY_STATES['FINISHED']
       # queueA has default query options mem_limit=128m,query_timeout_s=5
-      self.__check_query_options(client.get_runtime_profile(handle),\
+      self.__check_query_options(client.get_runtime_profile(handle),
           [queueA_mem_limit, 'QUERY_TIMEOUT_S=5', 'REQUEST_POOL=root.queueA'])
       client.close_query(handle)
 
@@ -330,8 +350,8 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
       client.execute("set mem_limit=31337")
       client.execute("set abort_on_error=1")
       result = client.execute("select 1")
-      self.__check_query_options(result.runtime_profile,\
-          ['MEM_LIMIT=31337', 'ABORT_ON_ERROR=1', 'QUERY_TIMEOUT_S=5',\
+      self.__check_query_options(result.runtime_profile,
+          ['MEM_LIMIT=31337', 'ABORT_ON_ERROR=1', 'QUERY_TIMEOUT_S=5',
            'REQUEST_POOL=root.queueA'])
 
       # Should be able to set query options (overriding defaults if applicable) with the
@@ -339,8 +359,8 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
       # max_io_buffers has no proc/pool default.
       client.set_configuration({'request_pool': 'root.queueA', 'mem_limit': '12345'})
       result = client.execute("select 1")
-      self.__check_query_options(result.runtime_profile,\
-          ['MEM_LIMIT=12345', 'QUERY_TIMEOUT_S=5', 'REQUEST_POOL=root.queueA',\
+      self.__check_query_options(result.runtime_profile,
+          ['MEM_LIMIT=12345', 'QUERY_TIMEOUT_S=5', 'REQUEST_POOL=root.queueA',
            'ABORT_ON_ERROR=1'])
 
       # Once options are reset to their defaults, the queue
@@ -349,7 +369,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
       # abort on error, because it's back to being the default.
       client.execute('set mem_limit=""')
       client.execute('set abort_on_error=""')
-      client.set_configuration({ 'request_pool': 'root.queueA' })
+      client.set_configuration({'request_pool': 'root.queueA'})
       result = client.execute("select 1")
       self.__check_query_options(result.runtime_profile,
             [queueA_mem_limit, 'REQUEST_POOL=root.queueA', 'QUERY_TIMEOUT_S=5'])
@@ -363,18 +383,21 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     batch_size = "BATCH_SIZE=100"
 
     # Check HS2 query in queueA gets the correct query options for the pool.
-    self.__check_hs2_query_opts("root.queueA", None,\
+    self.__check_hs2_query_opts("root.queueA", None,
         [queueA_mem_limit, 'QUERY_TIMEOUT_S=5', 'REQUEST_POOL=root.queueA', batch_size])
     # Check overriding the mem limit sent in the confOverlay with the query.
-    self.__check_hs2_query_opts("root.queueA", '12345',\
+    self.__check_hs2_query_opts("root.queueA", '12345',
         ['MEM_LIMIT=12345', 'QUERY_TIMEOUT_S=5', 'REQUEST_POOL=root.queueA', batch_size])
     # Check HS2 query in queueB gets the process-wide default query options
-    self.__check_hs2_query_opts("root.queueB", None,\
+    self.__check_hs2_query_opts("root.queueB", None,
         ['MEM_LIMIT=200000000', 'REQUEST_POOL=root.queueB', batch_size])
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-      impalad_args=impalad_admission_ctrl_config_args("-require_username"),
+      impalad_args=impalad_admission_ctrl_config_args(
+        fs_allocation_file="fair-scheduler-test2.xml",
+        llama_site_file="llama-site-test2.xml",
+        additional_args="-require_username"),
       statestored_args=_STATESTORED_ARGS)
   def test_require_user(self):
     open_session_req = TCLIService.TOpenSessionReq()
@@ -394,7 +417,6 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
       close_req.sessionHandle = open_session_resp.sessionHandle
       TestAdmissionController.check_response(self.hs2_client.CloseSession(close_req))
 
-
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=1,
@@ -503,7 +525,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
       impalad_with_2g_mem.set_configuration_option('mem_limit', '1G')
       impalad_with_2g_mem.execute_async("select sleep(1000)")
       # Wait for statestore update to update the mem admitted in each node.
-      sleep(STATESTORE_RPC_FREQUENCY_MS/1000)
+      sleep(STATESTORE_RPC_FREQUENCY_MS / 1000)
       exec_options = copy(vector.get_value('exec_option'))
       exec_options['mem_limit'] = "2G"
       # Since Queuing is synchronous and we can't close the previous query till this
@@ -518,7 +540,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-    impalad_args= "--logbuflevel=-1 " + impalad_admission_ctrl_flags(max_requests=1,
+    impalad_args="--logbuflevel=-1 " + impalad_admission_ctrl_flags(max_requests=1,
         max_queued=1, pool_max_mem=PROC_MEM_TEST_LIMIT),
     statestored_args=_STATESTORED_ARGS)
   def test_cancellation(self):
@@ -528,7 +550,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     client = impalad.service.create_beeswax_client()
     try:
       client.set_configuration_option("debug_action", "CRS_BEFORE_ADMISSION:SLEEP@2000")
-      client.set_configuration_option("mem_limit", self.PROC_MEM_TEST_LIMIT + 1 )
+      client.set_configuration_option("mem_limit", self.PROC_MEM_TEST_LIMIT + 1)
       handle = client.execute_async("select 1")
       sleep(1)
       client.close_query(handle)
@@ -638,7 +660,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
         "9.00 MB but only 1.00 MB was available."
     NUM_QUERIES = 5
     profiles = self._execute_and_collect_profiles([STMT for i in xrange(NUM_QUERIES)],
-        TIMEOUT_S, {'mem_limit':'9mb'})
+        TIMEOUT_S, {'mem_limit': '9mb'})
 
     num_reasons = len([profile for profile in profiles if EXPECTED_REASON in profile])
     assert num_reasons == NUM_QUERIES - 1, \
@@ -702,6 +724,95 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
       assert num_q == expected_num, "There should be {0} running queries on either " \
                                     "impalads: {0}".format(query_locations)
 
+  @SkipIfNotHdfsMinicluster.tuned_for_minicluster
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args=impalad_admission_ctrl_config_args(
+      fs_allocation_file="mem-limit-test-fair-scheduler.xml",
+      llama_site_file="mem-limit-test-llama-site.xml", make_copy=True),
+    statestored_args=_STATESTORED_ARGS)
+  def test_pool_mem_limit_configs(self, vector):
+    """Runs functional tests for the max/min_query_mem_limit pool config attributes"""
+    exec_options = vector.get_value('exec_option')
+    # Set this to the default.
+    exec_options['exec_single_node_rows_threshold'] = 100
+    # Set num_nodes to 1 since its easier to see one-to-one mapping of per_host and
+    # per_cluster values used in the test.
+    exec_options['num_nodes'] = 1
+    self.run_test_case('QueryTest/admission-max-min-mem-limits', vector)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args=impalad_admission_ctrl_config_args(
+      fs_allocation_file="mem-limit-test-fair-scheduler.xml",
+      llama_site_file="mem-limit-test-llama-site.xml",
+      additional_args="-default_pool_max_requests 1", make_copy=True),
+    statestored_args=_STATESTORED_ARGS)
+  def test_pool_config_change_while_queued(self, vector):
+    """Tests if the invalid checks work even if the query is queued. Makes sure the query
+    is not dequeued if the config is invalid and is promptly dequeued when it goes back
+    to being valid"""
+    pool_name = "invalidTestPool"
+    config_str = "max-query-mem-limit"
+    self.client.set_configuration_option('request_pool', pool_name)
+    # Setup to queue a query.
+    sleep_query_handle = self.client.execute_async("select sleep(10000)")
+    self.client.wait_for_admission_control(sleep_query_handle)
+    queued_query_handle = self.client.execute_async("select 1")
+    self.__wait_for_change_to_profile(queued_query_handle, "Admission result: Queued")
+
+    # Change config to be invalid.
+    llama_site_path = os.path.join(RESOURCES_DIR, "copy-mem-limit-test-llama-site.xml")
+    config = ResourcePoolConfig(self.cluster.impalads[0].service, llama_site_path)
+    config.set_config_value(pool_name, config_str, 1)
+    # Close running query so the queued one gets a chance.
+    self.client.close_query(sleep_query_handle)
+
+    # Check latest queued reason changed
+    queued_reason = "Latest admission queue reason: Invalid pool config: the " \
+                    "min_query_mem_limit is greater than the max_query_mem_limit" \
+                    " (26214400 > 1)"
+    self.__wait_for_change_to_profile(queued_query_handle, queued_reason)
+
+    # Now change the config back to valid value and make sure the query is allowed to run.
+    config.set_config_value(pool_name, config_str, 0)
+    self.client.wait_for_finished_timeout(queued_query_handle, 20)
+    self.close_query(queued_query_handle)
+
+    # Now do the same thing for change to pool.max-query-mem-limit such that it can no
+    # longer accommodate the largest min_reservation.
+    # Setup to queue a query.
+    sleep_query_handle = self.client.execute_async("select sleep(10000)")
+    self.client.wait_for_admission_control(sleep_query_handle)
+    queued_query_handle = self.client.execute_async(
+      "select * from functional_parquet.alltypes limit 1")
+    self.__wait_for_change_to_profile(queued_query_handle, "Admission result: Queued")
+    # Change config to something less than the what is required to accommodate the
+    # largest min_reservation (which in this case is 32.09 MB.
+    config.set_config_value(pool_name, config_str, 25 * 1024 * 1024)
+    # Close running query so the queued one gets a chance.
+    self.client.close_query(sleep_query_handle)
+    # Check latest queued reason changed
+    queued_reason = "minimum memory reservation is greater than memory available to " \
+                    "the query for buffer reservations. Memory reservation needed given" \
+                    " the current plan: 88.00 KB. Adjust either the mem_limit or the" \
+                    " pool config (max-query-mem-limit, min-query-mem-limit) for the" \
+                    " query to allow the query memory limit to be at least 32.09 MB."
+    self.__wait_for_change_to_profile(queued_query_handle, queued_reason, 5)
+    # Now change the config back to a reasonable value.
+    config.set_config_value(pool_name, config_str, 0)
+    self.client.wait_for_finished_timeout(queued_query_handle, 20)
+    self.close_query(queued_query_handle)
+
+  def __wait_for_change_to_profile(self, query_handle, search_string, timeout=20):
+    for _ in range(timeout * 10):
+      profile = self.client.get_runtime_profile(query_handle)
+      if search_string in profile:
+        return
+      sleep(0.1)
+    assert False, "Timed out waiting for change to profile\nSearch " \
+                  "String: {0}\nProfile:\n{1}".format(search_string, str(profile))
+
 
 class TestAdmissionControllerStress(TestAdmissionControllerBase):
   """Submits a number of queries (parameterized) with some delay between submissions
@@ -753,8 +864,8 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
       num_queries = 30
       cls.ImpalaTestMatrix.add_constraint(
           lambda v: v.get_value('submission_delay_ms') == 0)
-      cls.ImpalaTestMatrix.add_constraint(\
-          lambda v: v.get_value('round_robin_submission') == True)
+      cls.ImpalaTestMatrix.add_constraint(
+          lambda v: v.get_value('round_robin_submission'))
 
     if num_queries is not None:
       cls.ImpalaTestMatrix.add_constraint(
@@ -801,8 +912,8 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
     The metrics names are shortened for brevity: 'admitted', 'queued', 'dequeued',
     'rejected', 'released', and 'timed-out'.
     """
-    metrics = {'admitted': 0, 'queued': 0, 'dequeued': 0, 'rejected' : 0,
-        'released': 0, 'timed-out': 0}
+    metrics = {'admitted': 0, 'queued': 0, 'dequeued': 0, 'rejected': 0,
+               'released': 0, 'timed-out': 0}
     for impalad in self.impalads:
       keys = [metric_key(self.pool_name, 'total-%s' % short_name)
               for short_name in metrics.keys()]
@@ -847,8 +958,8 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
       current = self.get_admission_metrics()
       log_metrics("wait_for_metric_changes, current=", current)
       deltas = compute_metric_deltas(current, initial)
-      delta_sum = sum([ deltas[x] for x in metric_names ])
-      LOG.info("DeltaSum=%s Deltas=%s (Expected=%s for metrics=%s)",\
+      delta_sum = sum([deltas[x] for x in metric_names])
+      LOG.info("DeltaSum=%s Deltas=%s (Expected=%s for metrics=%s)",
           delta_sum, deltas, expected_delta, metric_names)
       if delta_sum >= expected_delta:
         LOG.info("Found all %s metrics after %s seconds", delta_sum,
@@ -857,13 +968,13 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
       assert (time() - start_time < STRESS_TIMEOUT),\
           "Timed out waiting {0} seconds for metrics {1} delta {2} "\
           "current {3} initial {4}" .format(
-              STRESS_TIMEOUT, ','.join(metric_names), expected_delta, str(current), str(initial))
+              STRESS_TIMEOUT, ','.join(metric_names), expected_delta, str(current),
+              str(initial))
       sleep(1)
 
   def wait_for_statestore_updates(self, heartbeats):
     """Waits for a number of admission control statestore updates from all impalads."""
     start_time = time()
-    num_impalads = len(self.impalads)
     init = dict()
     curr = dict()
     for impalad in self.impalads:
@@ -898,7 +1009,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
     while len(self.executing_threads) < num_threads:
       assert (time() - start_time < STRESS_TIMEOUT), ("Timed out waiting %s seconds for "
           "%s admitted client rpcs to return. Only %s executing " % (
-          STRESS_TIMEOUT, num_threads, len(self.executing_threads)))
+            STRESS_TIMEOUT, num_threads, len(self.executing_threads)))
       sleep(0.1)
     LOG.info("Found all %s admitted threads after %s seconds", num_threads,
         round(time() - start_time, 1))
@@ -955,7 +1066,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
       # lock protects query_handle and shutdown, used by the main thread in teardown()
       self.lock = threading.RLock()
       self.query_handle = None
-      self.shutdown = False # Set by the main thread when tearing down
+      self.shutdown = False  # Set by the main thread when tearing down
 
     def run(self):
       client = None
@@ -1039,7 +1150,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
         # Sleep and wait for the query to be cancelled. The cancellation will
         # set the state to EXCEPTION.
         start_time = time()
-        while (client.get_state(self.query_handle) != \
+        while (client.get_state(self.query_handle) !=
                client.QUERY_STATES['EXCEPTION']):
           assert (time() - start_time < STRESS_TIMEOUT),\
             "Timed out waiting %s seconds for query cancel" % (STRESS_TIMEOUT,)
@@ -1060,7 +1171,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
     admission control."""
     for impalad in self.impalads:
       queries_json = impalad.service.get_debug_webpage_json('/queries')
-      for query in itertools.chain(queries_json['in_flight_queries'], \
+      for query in itertools.chain(queries_json['in_flight_queries'],
           queries_json['completed_queries']):
         if query['stmt_type'] == 'QUERY' or query['stmt_type'] == 'DML':
           assert query['last_event'] != 'Registered' and \
@@ -1090,8 +1201,8 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
 
     num_queries = vector.get_value('num_queries')
     assert num_queries >= MAX_NUM_CONCURRENT_QUERIES + MAX_NUM_QUEUED_QUERIES
-    initial_metrics = self.get_admission_metrics();
-    log_metrics("Initial metrics: ", initial_metrics);
+    initial_metrics = self.get_admission_metrics()
+    log_metrics("Initial metrics: ", initial_metrics)
 
     for query_num in xrange(num_queries):
       impalad = self.impalads[query_num % len(self.impalads)]
@@ -1109,7 +1220,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
     # without this thread explicitly ending them, so that the test can admit queries in
     # discrete waves.
     LOG.info("Wait for initial admission decisions")
-    (metric_deltas, curr_metrics) = self.wait_for_metric_changes(\
+    (metric_deltas, curr_metrics) = self.wait_for_metric_changes(
         ['admitted', 'queued', 'rejected'], initial_metrics, num_queries)
     # Also wait for the test threads that submitted the queries to start executing.
     self.wait_for_admitted_threads(metric_deltas['admitted'])
@@ -1129,7 +1240,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
     assert metric_deltas['queued'] <= MAX_NUM_QUEUED_QUERIES * len(self.impalads),\
         "Queued too many queries: at least one daemon queued too many"
     assert metric_deltas['rejected'] + metric_deltas['admitted'] +\
-        metric_deltas['queued'] == num_queries ,\
+        metric_deltas['queued'] == num_queries,\
         "Initial admission decisions don't add up to {0}: {1}".format(
         num_queries, str(metric_deltas))
     initial_metric_deltas = metric_deltas
@@ -1144,8 +1255,8 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
     # Admit queries in waves until all queries are done. A new wave of admission
     # is started by killing some of the running queries.
     while len(self.executing_threads) > 0:
-      curr_metrics = self.get_consistent_admission_metrics(num_queries);
-      log_metrics("Main loop, curr_metrics: ", curr_metrics);
+      curr_metrics = self.get_consistent_admission_metrics(num_queries)
+      log_metrics("Main loop, curr_metrics: ", curr_metrics)
       num_to_end = len(self.executing_threads)
       LOG.info("Main loop, will request %s queries to end", num_to_end)
       self.end_admitted_queries(num_to_end)
@@ -1166,8 +1277,8 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
       # state or we may find an impalad dequeue more requests after we capture metrics.
       self.wait_for_statestore_updates(10)
 
-    final_metrics = self.get_consistent_admission_metrics(num_queries);
-    log_metrics("Final metrics: ", final_metrics);
+    final_metrics = self.get_consistent_admission_metrics(num_queries)
+    log_metrics("Final metrics: ", final_metrics)
     metric_deltas = compute_metric_deltas(final_metrics, initial_metrics)
     assert metric_deltas['timed-out'] == 0
 
@@ -1212,8 +1323,10 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-      impalad_args=impalad_admission_ctrl_config_args(),
-      statestored_args=_STATESTORED_ARGS)
+    impalad_args=impalad_admission_ctrl_config_args(
+      fs_allocation_file="fair-scheduler-test2.xml",
+      llama_site_file="llama-site-test2.xml"),
+    statestored_args=_STATESTORED_ARGS)
   def test_admission_controller_with_configs(self, vector):
     self.pool_name = 'root.queueB'
     self.run_admission_test(vector, {'request_pool': self.pool_name})
@@ -1241,7 +1354,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
     # settings of the OS. It should be fine to continue anyway.
     proc_limit = self.get_proc_limit()
     if proc_limit != MEM_TEST_LIMIT:
-      LOG.info("Warning: Process mem limit %s is not expected val %s", limit_val,
+      LOG.info("Warning: Process mem limit %s is not expected val %s", proc_limit,
           MEM_TEST_LIMIT)
 
     self.pool_name = 'default-pool'


[3/8] impala git commit: IMPALA-7349: Add Admission control support for automatically setting per host memory limit for a query

Posted by tm...@apache.org.
IMPALA-7349: Add Admission control support for automatically setting
per host memory limit for a query

With this patch the per host memory limit of a query is automatically
set using the mem_limit set in the query options and the mem_estimate
calculated by the planner based on the following pseudo code:

if mem_limit is set in query options:
  use that and if 'clamp-mem-limit-query-option' is true:
    enforce the min/max query mem limits defined in the pool config.
else:
  mem_limit = max(mem_estiamte,
    min_mem_limit_required_to_accomodate_largest_initial_reservation)
  finally, enforce min/max query mem limits defined in the pool
  config on this value.

This calculated mem limit will also be used for admission accounting
and consequently for admission control. Moreover, three new pool
configuration options have been added to enable this behaviour:

"min-query-mem-limit" & "max-query-mem-limit" => help
clamp the per host memory limit for a query. If both these limits
are not configured, then the estimates from planning are not used
as a memory limit and only used for making admission decisions.
Moreover the estimates will no longer have a lower bound based
on the largest initial reservation.

"clamp-mem-limit-query-option" => if false, the mem_limit defined in
the query options is used directly and the max/min query mem limits
are not enforced on it.

Testing:
Added e2e test cases.
Added frontend tests for changes to RequestPoolService.
Successfully passed exhaustive tests.

Change-Id: Ifec00141651982f5975803c2165b7d7a10ebeaa6
Reviewed-on: http://gerrit.cloudera.org:8080/11157
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/fc91e706
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/fc91e706
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/fc91e706

Branch: refs/heads/master
Commit: fc91e706b4f3b45cdda28d977f652cee3f050e7b
Parents: c80c62f
Author: Bikramjeet Vig <bi...@cloudera.com>
Authored: Wed Jul 25 18:35:47 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Fri Oct 5 04:38:24 2018 +0000

----------------------------------------------------------------------
 .../benchmarks/process-wide-locks-benchmark.cc  |   2 +-
 be/src/runtime/coordinator-backend-state.cc     |   1 +
 be/src/runtime/coordinator.cc                   |   3 +-
 be/src/runtime/mem-tracker.cc                   |  16 +-
 be/src/runtime/mem-tracker.h                    |  12 +-
 be/src/runtime/query-exec-mgr.cc                |  12 +-
 be/src/runtime/query-exec-mgr.h                 |   5 +-
 be/src/runtime/query-state.cc                   |  18 +-
 be/src/runtime/query-state.h                    |  13 +-
 be/src/runtime/runtime-state.cc                 |   9 +-
 be/src/runtime/test-env.cc                      |   7 +-
 be/src/scheduling/admission-controller.cc       | 204 +++++++++++------
 be/src/scheduling/admission-controller.h        | 141 ++++++++----
 be/src/scheduling/query-schedule.cc             |  85 ++++---
 be/src/scheduling/query-schedule.h              |  57 ++++-
 be/src/scheduling/request-pool-service.cc       |   3 +
 be/src/scheduling/scheduler.cc                  |   4 +
 be/src/service/client-request-state.cc          |   5 +-
 common/thrift/ImpalaInternalService.thrift      |  18 ++
 common/thrift/metrics.json                      |  30 +++
 .../apache/impala/util/RequestPoolService.java  |  41 +++-
 .../impala/util/TestRequestPoolService.java     |  19 +-
 fe/src/test/resources/fair-scheduler-test.xml   |   4 +
 fe/src/test/resources/llama-site-test.xml       |  12 +
 .../resources/mem-limit-test-fair-scheduler.xml |  46 ++++
 .../resources/mem-limit-test-llama-site.xml     |  88 +++++++
 .../QueryTest/admission-max-min-mem-limits.test | 153 +++++++++++++
 .../admission-reject-min-reservation.test       |  12 +-
 tests/common/resource_pool_config.py            |  96 ++++++++
 .../custom_cluster/test_admission_controller.py | 229 ++++++++++++++-----
 30 files changed, 1077 insertions(+), 268 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/be/src/benchmarks/process-wide-locks-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/process-wide-locks-benchmark.cc b/be/src/benchmarks/process-wide-locks-benchmark.cc
index 465bb00..373f5b7 100644
--- a/be/src/benchmarks/process-wide-locks-benchmark.cc
+++ b/be/src/benchmarks/process-wide-locks-benchmark.cc
@@ -84,7 +84,7 @@ void CreateAndAccessQueryStates(const TUniqueId& query_id, int num_accesses) {
   query_ctx.__set_request_pool(resolved_pool);
 
   QueryState *query_state;
-  query_state = ExecEnv::GetInstance()->query_exec_mgr()->CreateQueryState(query_ctx);
+  query_state = ExecEnv::GetInstance()->query_exec_mgr()->CreateQueryState(query_ctx, -1);
   DCHECK(query_state != nullptr);
   query_state->AcquireBackendResourceRefcount();
 

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/be/src/runtime/coordinator-backend-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 95484a4..9acab79 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -84,6 +84,7 @@ void Coordinator::BackendState::SetRpcParams(const DebugOptions& debug_options,
       backend_exec_params_->min_mem_reservation_bytes);
   rpc_params->__set_initial_mem_reservation_total_claims(
       backend_exec_params_->initial_mem_reservation_total_claims);
+  rpc_params->__set_per_backend_mem_limit(coord_.schedule_.per_backend_mem_limit());
 
   // set fragment_ctxs and fragment_instance_ctxs
   rpc_params->__isset.fragment_ctxs = true;

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 04f9392..eb9fe81 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -107,7 +107,8 @@ Status Coordinator::Exec() {
   bool is_mt_execution = request.query_ctx.client_request.query_options.mt_dop > 0;
   if (is_mt_execution) filter_mode_ = TRuntimeFilterMode::OFF;
 
-  query_state_ = ExecEnv::GetInstance()->query_exec_mgr()->CreateQueryState(query_ctx());
+  query_state_ = ExecEnv::GetInstance()->query_exec_mgr()->CreateQueryState(
+      query_ctx(), schedule_.per_backend_mem_limit());
   filter_mem_tracker_ = query_state_->obj_pool()->Add(new MemTracker(
       -1, "Runtime Filter (Coordinator)", query_state_->query_mem_tracker(), false));
 

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/be/src/runtime/mem-tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.cc b/be/src/runtime/mem-tracker.cc
index d204ce8..a2e2295 100644
--- a/be/src/runtime/mem-tracker.cc
+++ b/be/src/runtime/mem-tracker.cc
@@ -209,26 +209,22 @@ MemTracker* PoolMemTrackerRegistry::GetRequestPoolMemTracker(
 }
 
 MemTracker* MemTracker::CreateQueryMemTracker(const TUniqueId& id,
-    const TQueryOptions& query_options, const string& pool_name, ObjectPool* obj_pool) {
-  int64_t byte_limit = -1;
-  if (query_options.__isset.mem_limit && query_options.mem_limit > 0) {
-    byte_limit = query_options.mem_limit;
-  }
-  if (byte_limit != -1) {
-    if (byte_limit > MemInfo::physical_mem()) {
-      LOG(WARNING) << "Memory limit " << PrettyPrinter::Print(byte_limit, TUnit::BYTES)
+    int64_t mem_limit, const string& pool_name, ObjectPool* obj_pool) {
+  if (mem_limit != -1) {
+    if (mem_limit > MemInfo::physical_mem()) {
+      LOG(WARNING) << "Memory limit " << PrettyPrinter::Print(mem_limit, TUnit::BYTES)
                    << " exceeds physical memory of "
                    << PrettyPrinter::Print(MemInfo::physical_mem(), TUnit::BYTES);
     }
     VLOG(2) << "Using query memory limit: "
-            << PrettyPrinter::Print(byte_limit, TUnit::BYTES);
+            << PrettyPrinter::Print(mem_limit, TUnit::BYTES);
   }
 
   MemTracker* pool_tracker =
       ExecEnv::GetInstance()->pool_mem_trackers()->GetRequestPoolMemTracker(
           pool_name, true);
   MemTracker* tracker = obj_pool->Add(new MemTracker(
-      byte_limit, Substitute("Query($0)", PrintId(id)), pool_tracker));
+      mem_limit, Substitute("Query($0)", PrintId(id)), pool_tracker));
   tracker->is_query_mem_tracker_ = true;
   tracker->query_id_ = id;
   return tracker;

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/be/src/runtime/mem-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h
index d4b4dd4..756a22a 100644
--- a/be/src/runtime/mem-tracker.h
+++ b/be/src/runtime/mem-tracker.h
@@ -120,13 +120,11 @@ class MemTracker {
   /// The counters should be owned by the fragment's RuntimeProfile.
   void EnableReservationReporting(const ReservationTrackerCounters& counters);
 
-  /// Construct a MemTracker object for query 'id'. The query limits are determined based
-  /// on 'query_options'. The MemTracker is a child of the request pool MemTracker for
-  /// 'pool_name', which is created if needed. The returned MemTracker is owned by
-  /// 'obj_pool'.
-  static MemTracker* CreateQueryMemTracker(const TUniqueId& id,
-      const TQueryOptions& query_options, const std::string& pool_name,
-      ObjectPool* obj_pool);
+  /// Construct a MemTracker object for query 'id' with 'mem_limit' as the memory limit.
+  /// The MemTracker is a child of the request pool MemTracker for 'pool_name', which is
+  /// created if needed. The returned MemTracker is owned by 'obj_pool'.
+  static MemTracker* CreateQueryMemTracker(const TUniqueId& id, int64_t mem_limit,
+      const std::string& pool_name, ObjectPool* obj_pool);
 
   /// Increases consumption of this tracker and its ancestors by 'bytes'.
   void Consume(int64_t bytes) {

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/be/src/runtime/query-exec-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-exec-mgr.cc b/be/src/runtime/query-exec-mgr.cc
index 4e1a340..1eca80e 100644
--- a/be/src/runtime/query-exec-mgr.cc
+++ b/be/src/runtime/query-exec-mgr.cc
@@ -47,7 +47,8 @@ Status QueryExecMgr::StartQuery(const TExecQueryFInstancesParams& params) {
           << " coord=" << TNetworkAddressToString(params.query_ctx.coord_address);
 
   bool dummy;
-  QueryState* qs = GetOrCreateQueryState(params.query_ctx, &dummy);
+  QueryState* qs =
+      GetOrCreateQueryState(params.query_ctx, params.per_backend_mem_limit, &dummy);
   Status status = qs->Init(params);
   if (!status.ok()) {
     qs->ReleaseBackendResourceRefcount(); // Release refcnt acquired in Init().
@@ -71,9 +72,10 @@ Status QueryExecMgr::StartQuery(const TExecQueryFInstancesParams& params) {
   return Status::OK();
 }
 
-QueryState* QueryExecMgr::CreateQueryState(const TQueryCtx& query_ctx) {
+QueryState* QueryExecMgr::CreateQueryState(
+    const TQueryCtx& query_ctx, int64_t mem_limit) {
   bool created;
-  QueryState* qs = GetOrCreateQueryState(query_ctx, &created);
+  QueryState* qs = GetOrCreateQueryState(query_ctx, mem_limit, &created);
   DCHECK(created);
   return qs;
 }
@@ -97,7 +99,7 @@ QueryState* QueryExecMgr::GetQueryState(const TUniqueId& query_id) {
 }
 
 QueryState* QueryExecMgr::GetOrCreateQueryState(
-    const TQueryCtx& query_ctx, bool* created) {
+    const TQueryCtx& query_ctx, int64_t mem_limit, bool* created) {
   QueryState* qs = nullptr;
   int refcnt;
   {
@@ -108,7 +110,7 @@ QueryState* QueryExecMgr::GetOrCreateQueryState(
     auto it = map_ref->find(query_ctx.query_id);
     if (it == map_ref->end()) {
       // register new QueryState
-      qs = new QueryState(query_ctx);
+      qs = new QueryState(query_ctx, mem_limit);
       map_ref->insert(make_pair(query_ctx.query_id, qs));
       *created = true;
     } else {

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/be/src/runtime/query-exec-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-exec-mgr.h b/be/src/runtime/query-exec-mgr.h
index bddd731..262ef59 100644
--- a/be/src/runtime/query-exec-mgr.h
+++ b/be/src/runtime/query-exec-mgr.h
@@ -54,7 +54,7 @@ class QueryExecMgr : public CacheLineAligned {
   /// Creates a QueryState for the given query with the provided parameters. Only valid
   /// to call if the QueryState does not already exist. The caller must call
   /// ReleaseQueryState() with the returned QueryState to decrement the refcount.
-  QueryState* CreateQueryState(const TQueryCtx& query_ctx);
+  QueryState* CreateQueryState(const TQueryCtx& query_ctx, int64_t mem_limit);
 
   /// If a QueryState for the given query exists, increments that refcount and returns
   /// the QueryState, otherwise returns nullptr.
@@ -71,7 +71,8 @@ class QueryExecMgr : public CacheLineAligned {
   /// Gets the existing QueryState or creates a new one if not present.
   /// 'created' is set to true if it was created, false otherwise.
   /// Increments the refcount.
-  QueryState* GetOrCreateQueryState(const TQueryCtx& query_ctx, bool* created);
+  QueryState* GetOrCreateQueryState(
+      const TQueryCtx& query_ctx, int64_t mem_limit, bool* created);
 
   /// Execute instances and decrement refcount (acquire ownership of qs).
   void StartQueryHelper(QueryState* qs);

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 1dc41dc..12710bf 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -55,7 +55,8 @@ QueryState::ScopedRef::~ScopedRef() {
   ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(query_state_);
 }
 
-QueryState::QueryState(const TQueryCtx& query_ctx, const string& request_pool)
+QueryState::QueryState(
+    const TQueryCtx& query_ctx, int64_t mem_limit, const string& request_pool)
   : query_ctx_(query_ctx),
     backend_resource_refcnt_(0),
     refcnt_(0),
@@ -77,7 +78,8 @@ QueryState::QueryState(const TQueryCtx& query_ctx, const string& request_pool)
   if (query_options.batch_size <= 0) {
     query_options.__set_batch_size(DEFAULT_BATCH_SIZE);
   }
-  InitMemTrackers();
+  query_mem_tracker_ = MemTracker::CreateQueryMemTracker(
+      query_id(), mem_limit, query_ctx_.request_pool, &obj_pool_);
 }
 
 void QueryState::ReleaseBackendResources() {
@@ -154,18 +156,6 @@ Status QueryState::Init(const TExecQueryFInstancesParams& rpc_params) {
   return Status::OK();
 }
 
-void QueryState::InitMemTrackers() {
-  const string& pool = query_ctx_.request_pool;
-  int64_t bytes_limit = -1;
-  if (query_options().__isset.mem_limit && query_options().mem_limit > 0) {
-    bytes_limit = query_options().mem_limit;
-    VLOG(2) << "Using query memory limit from query options: "
-            << PrettyPrinter::Print(bytes_limit, TUnit::BYTES);
-  }
-  query_mem_tracker_ =
-      MemTracker::CreateQueryMemTracker(query_id(), query_options(), pool, &obj_pool_);
-}
-
 Status QueryState::InitBufferPoolState() {
   ExecEnv* exec_env = ExecEnv::GetInstance();
   int64_t mem_limit = query_mem_tracker_->GetLowestLimit(MemLimit::HARD);

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/be/src/runtime/query-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index 5cb499a..9810156 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -390,17 +390,16 @@ class QueryState {
   /// StartFInstances().
   int64_t fragment_events_start_time_ = 0;
 
-  /// Create QueryState w/ refcnt of 0.
-  /// The query is associated with the resource pool query_ctx.request_pool or
-  /// 'request_pool', if the former is not set (needed for tests).
-  QueryState(const TQueryCtx& query_ctx, const std::string& request_pool = "");
+  /// Create QueryState w/ a refcnt of 0 and a memory limit of 'mem_limit' bytes applied
+  /// to the query mem tracker. The query is associated with the resource pool set in
+  /// 'query_ctx.request_pool' or from 'request_pool', if the former is not set (needed
+  /// for tests).
+  QueryState(const TQueryCtx& query_ctx, int64_t mem_limit,
+      const std::string& request_pool = "");
 
   /// Execute the fragment instance and decrement the refcnt when done.
   void ExecFInstance(FragmentInstanceState* fis);
 
-  /// Called from constructor to initialize MemTrackers.
-  void InitMemTrackers();
-
   /// Called from Init() to set up buffer reservations and the file group.
   Status InitBufferPoolState() WARN_UNUSED_RESULT;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 385087c..127abb4 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -80,10 +80,15 @@ RuntimeState::RuntimeState(QueryState* query_state, const TPlanFragmentCtx& frag
 }
 
 // Constructor for standalone RuntimeState for test execution and fe-support.cc.
-// Sets up a dummy local QueryState to allow evaluating exprs, etc.
+// Sets up a dummy local QueryState (with mem_limit picked up from the query options)
+// to allow evaluating exprs, etc.
 RuntimeState::RuntimeState(
     const TQueryCtx& qctx, ExecEnv* exec_env, DescriptorTbl* desc_tbl)
-  : query_state_(new QueryState(qctx, "test-pool")),
+  : query_state_(new QueryState(qctx, qctx.client_request.query_options.__isset.mem_limit
+                && qctx.client_request.query_options.mem_limit > 0 ?
+            qctx.client_request.query_options.mem_limit :
+            -1,
+        "test-pool")),
     fragment_ctx_(nullptr),
     instance_ctx_(nullptr),
     local_query_state_(query_state_),

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/be/src/runtime/test-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc
index 486a230..ef5e978 100644
--- a/be/src/runtime/test-env.cc
+++ b/be/src/runtime/test-env.cc
@@ -113,9 +113,14 @@ Status TestEnv::CreateQueryState(
   query_ctx.query_id.hi = 0;
   query_ctx.query_id.lo = query_id;
   query_ctx.request_pool = "test-pool";
+  TQueryOptions* query_options_to_use = &query_ctx.client_request.query_options;
+  int64_t mem_limit =
+      query_options_to_use->__isset.mem_limit && query_options_to_use->mem_limit > 0 ?
+      query_options_to_use->mem_limit :
+      -1;
 
   // CreateQueryState() enforces the invariant that 'query_id' must be unique.
-  QueryState* qs = exec_env_->query_exec_mgr()->CreateQueryState(query_ctx);
+  QueryState* qs = exec_env_->query_exec_mgr()->CreateQueryState(query_ctx, mem_limit);
   query_states_.push_back(qs);
   // make sure to initialize data structures unrelated to the TExecQueryFInstancesParams
   // param

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/be/src/scheduling/admission-controller.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index 2e1f7d9..a23fce7 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -41,7 +41,7 @@ DEFINE_int64(queue_wait_timeout_ms, 60 * 1000, "Maximum amount of time (in "
 namespace impala {
 
 /// Convenience method.
-std::string PrintBytes(int64_t value) {
+string PrintBytes(int64_t value) {
   return PrettyPrinter::Print(value, TUnit::BYTES);
 }
 
@@ -89,6 +89,12 @@ const string POOL_MAX_REQUESTS_METRIC_KEY_FORMAT =
   "admission-controller.pool-max-requests.$0";
 const string POOL_MAX_QUEUED_METRIC_KEY_FORMAT =
   "admission-controller.pool-max-queued.$0";
+const string POOL_MAX_QUERY_MEM_LIMIT_METRIC_KEY_FORMAT =
+  "admission-controller.pool-max-query-mem-limit.$0";
+const string POOL_MIN_QUERY_MEM_LIMIT_METRIC_KEY_FORMAT =
+  "admission-controller.pool-min-query-mem-limit.$0";
+const string POOL_CLAMP_MEM_LIMIT_QUERY_OPTION_METRIC_KEY_FORMAT =
+  "admission-controller.pool-clamp-mem-limit-query-option.$0";
 
 // Profile query events
 const string QUERY_EVENT_SUBMIT_FOR_ADMISSION = "Submit for admission";
@@ -106,14 +112,16 @@ const string PROFILE_INFO_VAL_TIME_OUT = "Timed out (queued)";
 const string PROFILE_INFO_KEY_INITIAL_QUEUE_REASON = "Initial admission queue reason";
 const string PROFILE_INFO_VAL_INITIAL_QUEUE_REASON = "waited $0 ms, reason: $1";
 const string PROFILE_INFO_KEY_LAST_QUEUED_REASON = "Latest admission queue reason";
+const string PROFILE_INFO_KEY_ADMITTED_MEM = "Cluster Memory Admitted";
 
 // Error status string details
 const string REASON_MEM_LIMIT_TOO_LOW_FOR_RESERVATION =
-    "minimum memory reservation is greater than memory available to the query "
-    "for buffer reservations. Memory reservation needed given the current plan: $0. Set "
-    "mem_limit to at least $1. Note that changing the mem_limit may also change the "
-    "plan. See the query profile for more information about the per-node memory "
-    "requirements.";
+    "minimum memory reservation is greater than memory available to the query for buffer "
+    "reservations. Memory reservation needed given the current plan: $0. Adjust either "
+    "the mem_limit or the pool config (max-query-mem-limit, min-query-mem-limit) for the "
+    "query to allow the query memory limit to be at least $1. Note that changing the "
+    "mem_limit may also change the plan. See the query profile for more information "
+    "about the per-node memory requirements.";
 const string REASON_BUFFER_LIMIT_TOO_LOW_FOR_RESERVATION =
     "minimum memory reservation on backend '$0' is greater than memory available to the "
     "query for buffer reservations. Increase the buffer_pool_limit to $1. See the query "
@@ -256,9 +264,10 @@ Status AdmissionController::Init() {
 }
 
 void AdmissionController::PoolStats::Admit(const QuerySchedule& schedule) {
-  int64_t mem_admitted = schedule.GetClusterMemoryEstimate();
-  local_mem_admitted_ += mem_admitted;
-  metrics_.local_mem_admitted->Increment(mem_admitted);
+  int64_t cluster_mem_admitted = schedule.GetClusterMemoryToAdmit();
+  DCHECK_GT(cluster_mem_admitted, 0);
+  local_mem_admitted_ += cluster_mem_admitted;
+  metrics_.local_mem_admitted->Increment(cluster_mem_admitted);
 
   agg_num_running_ += 1;
   metrics_.agg_num_running->Increment(1L);
@@ -270,9 +279,10 @@ void AdmissionController::PoolStats::Admit(const QuerySchedule& schedule) {
 }
 
 void AdmissionController::PoolStats::Release(const QuerySchedule& schedule) {
-  int64_t mem_admitted = schedule.GetClusterMemoryEstimate();
-  local_mem_admitted_ -= mem_admitted;
-  metrics_.local_mem_admitted->Increment(-mem_admitted);
+  int64_t cluster_mem_admitted = schedule.GetClusterMemoryToAdmit();
+  DCHECK_GT(cluster_mem_admitted, 0);
+  local_mem_admitted_ -= cluster_mem_admitted;
+  metrics_.local_mem_admitted->Increment(-cluster_mem_admitted);
 
   agg_num_running_ -= 1;
   metrics_.agg_num_running->Increment(-1L);
@@ -315,6 +325,7 @@ void AdmissionController::PoolStats::Dequeue(const QuerySchedule& schedule,
 
 void AdmissionController::UpdateHostMemAdmitted(const QuerySchedule& schedule,
     int64_t per_node_mem) {
+  DCHECK_NE(per_node_mem, 0);
   for (const auto& entry : schedule.per_backend_exec_params()) {
     const TNetworkAddress& host_addr = entry.first;
     const string host = TNetworkAddressToString(host_addr);
@@ -326,6 +337,25 @@ void AdmissionController::UpdateHostMemAdmitted(const QuerySchedule& schedule,
   }
 }
 
+bool AdmissionController::CanAccommodateMaxInitialReservation(
+    const QuerySchedule& schedule, const TPoolConfig& pool_cfg,
+    string* mem_unavailable_reason) {
+  const int64_t per_backend_mem_limit = schedule.per_backend_mem_limit();
+  if (per_backend_mem_limit > 0) {
+    const int64_t max_reservation =
+        ReservationUtil::GetReservationLimitFromMemLimit(per_backend_mem_limit);
+    const int64_t largest_min_mem_reservation = schedule.largest_min_reservation();
+    if (largest_min_mem_reservation > max_reservation) {
+      const int64_t required_mem_limit =
+          ReservationUtil::GetMinMemLimitFromReservation(largest_min_mem_reservation);
+      *mem_unavailable_reason = Substitute(REASON_MEM_LIMIT_TOO_LOW_FOR_RESERVATION,
+          PrintBytes(largest_min_mem_reservation), PrintBytes(required_mem_limit));
+      return false;
+    }
+  }
+  return true;
+}
+
 bool AdmissionController::HasAvailableMemResources(const QuerySchedule& schedule,
     const TPoolConfig& pool_cfg, string* mem_unavailable_reason) {
   const string& pool_name = schedule.request_pool();
@@ -339,17 +369,17 @@ bool AdmissionController::HasAvailableMemResources(const QuerySchedule& schedule
   //    specified.
   // 2) Each individual backend must have enough mem available within its process limit
   //    to execute the query.
-  int64_t per_node_mem_needed = schedule.GetPerHostMemoryEstimate();
-  int64_t cluster_mem_needed = schedule.GetClusterMemoryEstimate();
+  int64_t per_host_mem_to_admit = schedule.per_backend_mem_to_admit();
+  int64_t cluster_mem_to_admit = schedule.GetClusterMemoryToAdmit();
 
   // Case 1:
   PoolStats* stats = GetPoolStats(pool_name);
   VLOG_RPC << "Checking agg mem in pool=" << pool_name << " : " << stats->DebugString()
-           << " cluster_mem_needed=" << PrintBytes(cluster_mem_needed)
+           << " cluster_mem_needed=" << PrintBytes(cluster_mem_to_admit)
            << " pool_max_mem=" << PrintBytes(pool_max_mem);
-  if (stats->EffectiveMemReserved() + cluster_mem_needed > pool_max_mem) {
+  if (stats->EffectiveMemReserved() + cluster_mem_to_admit > pool_max_mem) {
     *mem_unavailable_reason = Substitute(POOL_MEM_NOT_AVAILABLE, pool_name,
-        PrintBytes(pool_max_mem), PrintBytes(cluster_mem_needed),
+        PrintBytes(pool_max_mem), PrintBytes(cluster_mem_to_admit),
         PrintBytes(max(pool_max_mem - stats->EffectiveMemReserved(), 0L)));
     return false;
   }
@@ -364,30 +394,42 @@ bool AdmissionController::HasAvailableMemResources(const QuerySchedule& schedule
     VLOG_ROW << "Checking memory on host=" << host_id
              << " mem_reserved=" << PrintBytes(mem_reserved)
              << " mem_admitted=" << PrintBytes(mem_admitted)
-             << " needs=" << PrintBytes(per_node_mem_needed)
+             << " needs=" << PrintBytes(per_host_mem_to_admit)
              << " proc_limit=" << PrintBytes(proc_mem_limit);
     int64_t effective_host_mem_reserved = std::max(mem_reserved, mem_admitted);
-    if (effective_host_mem_reserved + per_node_mem_needed > proc_mem_limit) {
+    if (effective_host_mem_reserved + per_host_mem_to_admit > proc_mem_limit) {
       *mem_unavailable_reason = Substitute(HOST_MEM_NOT_AVAILABLE, host_id,
-          PrintBytes(per_node_mem_needed),
+          PrintBytes(per_host_mem_to_admit),
           PrintBytes(max(proc_mem_limit - effective_host_mem_reserved, 0L)),
           PrintBytes(proc_mem_limit));
       return false;
     }
   }
-
+  const TQueryOptions& query_opts = schedule.query_options();
+  if (!query_opts.__isset.buffer_pool_limit || query_opts.buffer_pool_limit <= 0) {
+    // Check if a change in pool_cfg.max_query_mem_limit (while the query was queued)
+    // resulted in a decrease in the computed per_host_mem_limit such that it can no
+    // longer accommodate the largest min_reservation.
+    return CanAccommodateMaxInitialReservation(
+        schedule, pool_cfg, mem_unavailable_reason);
+  }
   return true;
 }
 
 bool AdmissionController::CanAdmitRequest(const QuerySchedule& schedule,
     const TPoolConfig& pool_cfg, bool admit_from_queue, string* not_admitted_reason) {
+  // Can't admit if:
+  //  (a) Pool configuration is invalid
+  //  (b) There are already queued requests (and this is not admitting from the queue).
+  //  (c) Already at the maximum number of requests
+  //  (d) There are not enough memory resources available for the query
+
+  // Queries from a misconfigured pool will remain queued till they either time out or the
+  // pool config is changed to a valid config.
+  if (!IsPoolConfigValid(pool_cfg, not_admitted_reason)) return false;
+
   const string& pool_name = schedule.request_pool();
   PoolStats* stats = GetPoolStats(pool_name);
-
-  // Can't admit if:
-  //  (a) There are already queued requests (and this is not admitting from the queue).
-  //  (b) Already at the maximum number of requests
-  //  (c) Request will go over the mem limit
   if (!admit_from_queue && stats->local_stats().num_queued > 0) {
     *not_admitted_reason = Substitute(QUEUED_QUEUE_NOT_EMPTY,
         stats->local_stats().num_queued);
@@ -410,6 +452,7 @@ bool AdmissionController::RejectImmediately(const QuerySchedule& schedule,
   // immediately. The first check that fails is the error that is reported. The order of
   // the checks isn't particularly important, though some thought was given to ordering
   // them in a way that might make the sense for a user.
+  if (!IsPoolConfigValid(pool_cfg, rejection_reason)) return true;
 
   // Compute the max (over all backends) and cluster total (across all backends) for
   // min_mem_reservation_bytes and thread_reservation and the min (over all backends)
@@ -445,18 +488,9 @@ bool AdmissionController::RejectImmediately(const QuerySchedule& schedule,
           PrintBytes(largest_min_mem_reservation.second));
       return true;
     }
-  } else if (query_opts.__isset.mem_limit && query_opts.mem_limit > 0) {
+  } else if (!CanAccommodateMaxInitialReservation(schedule, pool_cfg, rejection_reason)) {
     // If buffer_pool_limit is not explicitly set, it's calculated from mem_limit.
-    const int64_t mem_limit = query_opts.mem_limit;
-    const int64_t max_reservation =
-        ReservationUtil::GetReservationLimitFromMemLimit(mem_limit);
-    if (largest_min_mem_reservation.second > max_reservation) {
-      const int64_t required_mem_limit = ReservationUtil::GetMinMemLimitFromReservation(
-          largest_min_mem_reservation.second);
-      *rejection_reason = Substitute(REASON_MEM_LIMIT_TOO_LOW_FOR_RESERVATION,
-          PrintBytes(largest_min_mem_reservation.second), PrintBytes(required_mem_limit));
-      return true;
-    }
+    return true;
   }
 
   // Check thread reservation limits.
@@ -464,8 +498,8 @@ bool AdmissionController::RejectImmediately(const QuerySchedule& schedule,
       && query_opts.thread_reservation_limit > 0
       && max_thread_reservation.second > query_opts.thread_reservation_limit) {
     *rejection_reason = Substitute(REASON_THREAD_RESERVATION_LIMIT_EXCEEDED,
-        TNetworkAddressToString(*max_thread_reservation.first), max_thread_reservation.second,
-        query_opts.thread_reservation_limit);
+        TNetworkAddressToString(*max_thread_reservation.first),
+        max_thread_reservation.second, query_opts.thread_reservation_limit);
     return true;
   }
   if (query_opts.__isset.thread_reservation_aggregate_limit
@@ -495,16 +529,16 @@ bool AdmissionController::RejectImmediately(const QuerySchedule& schedule,
           PrintBytes(cluster_min_mem_reservation_bytes));
       return true;
     }
-    if (schedule.GetClusterMemoryEstimate() > pool_cfg.max_mem_resources) {
+    int64_t cluster_mem_to_admit = schedule.GetClusterMemoryToAdmit();
+    if (cluster_mem_to_admit > pool_cfg.max_mem_resources) {
       *rejection_reason = Substitute(REASON_REQ_OVER_POOL_MEM,
-          PrintBytes(schedule.GetClusterMemoryEstimate()),
-          PrintBytes(pool_cfg.max_mem_resources));
+          PrintBytes(cluster_mem_to_admit), PrintBytes(pool_cfg.max_mem_resources));
       return true;
     }
-    int64_t perHostMemoryEstimate = schedule.GetPerHostMemoryEstimate();
-    if (perHostMemoryEstimate > min_proc_mem_limit.second) {
+    int64_t per_backend_mem_to_admit = schedule.per_backend_mem_to_admit();
+    if (per_backend_mem_to_admit > min_proc_mem_limit.second) {
       *rejection_reason = Substitute(REASON_REQ_OVER_NODE_MEM,
-          PrintBytes(perHostMemoryEstimate), PrintBytes(min_proc_mem_limit.second),
+          PrintBytes(per_backend_mem_to_admit), PrintBytes(min_proc_mem_limit.second),
           TNetworkAddressToString(*min_proc_mem_limit.first));
       return true;
     }
@@ -525,19 +559,24 @@ void AdmissionController::PoolStats::UpdateConfigMetrics(const TPoolConfig& pool
   metrics_.pool_max_mem_resources->SetValue(pool_cfg.max_mem_resources);
   metrics_.pool_max_requests->SetValue(pool_cfg.max_requests);
   metrics_.pool_max_queued->SetValue(pool_cfg.max_queued);
+  metrics_.max_query_mem_limit->SetValue(pool_cfg.max_query_mem_limit);
+  metrics_.min_query_mem_limit->SetValue(pool_cfg.min_query_mem_limit);
+  metrics_.clamp_mem_limit_query_option->SetValue(
+      pool_cfg.clamp_mem_limit_query_option);
 }
 
-Status AdmissionController::AdmitQuery(QuerySchedule* schedule,
+Status AdmissionController::SubmitForAdmission(QuerySchedule* schedule,
     Promise<AdmissionOutcome, PromiseMode::MULTIPLE_PRODUCER>* admit_outcome) {
   const string& pool_name = schedule->request_pool();
   TPoolConfig pool_cfg;
   RETURN_IF_ERROR(request_pool_service_->GetPoolConfig(pool_name, &pool_cfg));
+  schedule->UpdateMemoryRequirements(pool_cfg);
   const int64_t max_requests = pool_cfg.max_requests;
   const int64_t max_queued = pool_cfg.max_queued;
   const int64_t max_mem = pool_cfg.max_mem_resources;
 
   // Note the queue_node will not exist in the queue when this method returns.
-  QueueNode queue_node(*schedule, admit_outcome, schedule->summary_profile());
+  QueueNode queue_node(schedule, admit_outcome, schedule->summary_profile());
   string not_admitted_reason;
 
   schedule->query_events()->MarkEvent(QUERY_EVENT_SUBMIT_FOR_ADMISSION);
@@ -550,8 +589,8 @@ Status AdmissionController::AdmitQuery(QuerySchedule* schedule,
     PoolStats* stats = GetPoolStats(pool_name);
     stats->UpdateConfigMetrics(pool_cfg);
     VLOG_QUERY << "Schedule for id=" << PrintId(schedule->query_id()) << " in pool_name="
-               << pool_name << " cluster_mem_needed="
-               << PrintBytes(schedule->GetClusterMemoryEstimate())
+               << pool_name << " per_host_mem_estimate="
+               << PrintBytes(schedule->GetPerHostMemoryEstimate())
                << " PoolConfig: max_requests=" << max_requests << " max_queued="
                << max_queued << " max_mem=" << PrintBytes(max_mem);
     VLOG_QUERY << "Stats: " << stats->DebugString();
@@ -587,10 +626,7 @@ Status AdmissionController::AdmitQuery(QuerySchedule* schedule,
         return Status::CANCELLED;
       }
       VLOG_QUERY << "Admitted query id=" << PrintId(schedule->query_id());
-      stats->Admit(*schedule);
-      UpdateHostMemAdmitted(*schedule, schedule->GetPerHostMemoryEstimate());
-      schedule->summary_profile()->AddInfoString(PROFILE_INFO_KEY_ADMISSION_RESULT,
-          PROFILE_INFO_VAL_ADMIT_IMMEDIATELY);
+      AdmitQuery(schedule, true);
       VLOG_RPC << "Final: " << stats->DebugString();
       return Status::OK();
     }
@@ -664,8 +700,6 @@ Status AdmissionController::AdmitQuery(QuerySchedule* schedule,
     // not change them here.
     DCHECK_ENUM_EQ(outcome, AdmissionOutcome::ADMITTED);
     DCHECK(!queue->Contains(&queue_node));
-    schedule->summary_profile()->AddInfoString(PROFILE_INFO_KEY_ADMISSION_RESULT,
-        PROFILE_INFO_VAL_ADMIT_QUEUED);
     VLOG_QUERY << "Admitted queued query id=" << PrintId(schedule->query_id());
     VLOG_RPC << "Final: " << stats->DebugString();
     return Status::OK();
@@ -678,7 +712,7 @@ void AdmissionController::ReleaseQuery(const QuerySchedule& schedule) {
     lock_guard<mutex> lock(admission_ctrl_lock_);
     PoolStats* stats = GetPoolStats(pool_name);
     stats->Release(schedule);
-    UpdateHostMemAdmitted(schedule, -schedule.GetPerHostMemoryEstimate());
+    UpdateHostMemAdmitted(schedule, -schedule.per_backend_mem_to_admit());
     pools_for_updates_.insert(pool_name);
     VLOG_RPC << "Released query id=" << PrintId(schedule.query_id()) << " "
              << stats->DebugString();
@@ -939,31 +973,31 @@ void AdmissionController::DequeueLoop() {
       while (max_to_dequeue > 0 && !queue.empty()) {
         QueueNode* queue_node = queue.head();
         DCHECK(queue_node != nullptr);
-        const QuerySchedule& schedule = queue_node->schedule;
+        QuerySchedule* schedule = queue_node->schedule;
+        schedule->UpdateMemoryRequirements(pool_config);
         bool is_cancelled = queue_node->admit_outcome->IsSet()
             && queue_node->admit_outcome->Get() == AdmissionOutcome::CANCELLED;
         string not_admitted_reason;
         // TODO: Requests further in the queue may be blocked unnecessarily. Consider a
         // better policy once we have better test scenarios.
         if (!is_cancelled
-            && !CanAdmitRequest(schedule, pool_config, true, &not_admitted_reason)) {
+            && !CanAdmitRequest(*schedule, pool_config, true, &not_admitted_reason)) {
           LogDequeueFailed(queue_node, not_admitted_reason);
           break;
         }
-        VLOG_RPC << "Dequeuing query=" << PrintId(schedule.query_id());
+        VLOG_RPC << "Dequeuing query=" << PrintId(schedule->query_id());
         queue.Dequeue();
         --max_to_dequeue;
-        stats->Dequeue(schedule, false);
+        stats->Dequeue(*schedule, false);
         // If query is already cancelled, just dequeue and continue.
         AdmissionOutcome outcome =
             queue_node->admit_outcome->Set(AdmissionOutcome::ADMITTED);
         if (outcome == AdmissionOutcome::CANCELLED) {
-          VLOG_QUERY << "Dequeued cancelled query=" << PrintId(schedule.query_id());
+          VLOG_QUERY << "Dequeued cancelled query=" << PrintId(schedule->query_id());
           continue;
         }
         DCHECK_ENUM_EQ(outcome, AdmissionOutcome::ADMITTED);
-        stats->Admit(schedule);
-        UpdateHostMemAdmitted(schedule, schedule.GetPerHostMemoryEstimate());
+        AdmitQuery(schedule, false);
       }
       pools_for_updates_.insert(pool_name);
     }
@@ -973,7 +1007,7 @@ void AdmissionController::DequeueLoop() {
 void AdmissionController::LogDequeueFailed(QueueNode* node,
     const string& not_admitted_reason) {
   VLOG_QUERY << "Could not dequeue query id="
-             << PrintId(node->schedule.query_id())
+             << PrintId(node->schedule->query_id())
              << " reason: " << not_admitted_reason;
   node->profile->AddInfoString(PROFILE_INFO_KEY_LAST_QUEUED_REASON,
       not_admitted_reason);
@@ -990,6 +1024,42 @@ AdmissionController::GetPoolStats(const string& pool_name) {
   return &it->second;
 }
 
+bool AdmissionController::IsPoolConfigValid(const TPoolConfig& pool_cfg, string* reason) {
+  if (pool_cfg.max_query_mem_limit > 0
+      && pool_cfg.min_query_mem_limit > pool_cfg.max_query_mem_limit) {
+    *reason = Substitute("Invalid pool config: the min_query_mem_limit is greater than "
+                         "the max_query_mem_limit ($0 > $1)",
+        pool_cfg.min_query_mem_limit, pool_cfg.max_query_mem_limit);
+    return false;
+  }
+  if (pool_cfg.max_mem_resources > 0
+      && pool_cfg.min_query_mem_limit > pool_cfg.max_mem_resources) {
+    *reason = Substitute("Invalid pool config: the min_query_mem_limit is greater than "
+                         "the max_mem_resources ($0 > $1)",
+        pool_cfg.min_query_mem_limit, pool_cfg.max_mem_resources);
+    return false;
+  }
+  return true;
+}
+
+void AdmissionController::AdmitQuery(QuerySchedule* schedule, bool was_queued) {
+  PoolStats* pool_stats = GetPoolStats(schedule->request_pool());
+  VLOG_RPC << "For Query " << schedule->query_id() << " per_backend_mem_limit set to: "
+           << PrintBytes(schedule->per_backend_mem_limit())
+           << " per_backend_mem_to_admit set to: "
+           << PrintBytes(schedule->per_backend_mem_to_admit());
+  // Update memory accounting.
+  pool_stats->Admit(*schedule);
+  UpdateHostMemAdmitted(*schedule, schedule->per_backend_mem_to_admit());
+  // Update summary profile.
+  const string& admission_result =
+      was_queued ? PROFILE_INFO_VAL_ADMIT_QUEUED : PROFILE_INFO_VAL_ADMIT_IMMEDIATELY;
+  schedule->summary_profile()->AddInfoString(
+      PROFILE_INFO_KEY_ADMISSION_RESULT, admission_result);
+  schedule->summary_profile()->AddInfoString(
+      PROFILE_INFO_KEY_ADMITTED_MEM, PrintBytes(schedule->GetClusterMemoryToAdmit()));
+}
+
 void AdmissionController::PoolStats::InitMetrics() {
   metrics_.total_admitted = parent_->metrics_group_->AddCounter(
       TOTAL_ADMITTED_METRIC_KEY_FORMAT, 0, name_);
@@ -1030,5 +1100,11 @@ void AdmissionController::PoolStats::InitMetrics() {
       POOL_MAX_REQUESTS_METRIC_KEY_FORMAT, 0, name_);
   metrics_.pool_max_queued = parent_->metrics_group_->AddGauge(
       POOL_MAX_QUEUED_METRIC_KEY_FORMAT, 0, name_);
+  metrics_.max_query_mem_limit = parent_->metrics_group_->AddGauge(
+      POOL_MAX_QUERY_MEM_LIMIT_METRIC_KEY_FORMAT, 0, name_);
+  metrics_.min_query_mem_limit = parent_->metrics_group_->AddGauge(
+      POOL_MIN_QUERY_MEM_LIMIT_METRIC_KEY_FORMAT, 0, name_);
+  metrics_.clamp_mem_limit_query_option = parent_->metrics_group_->AddProperty<bool>(
+      POOL_CLAMP_MEM_LIMIT_QUERY_OPTION_METRIC_KEY_FORMAT, false, name_);
 }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/be/src/scheduling/admission-controller.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h
index e5ffd99..b88356c 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -53,10 +53,12 @@ enum class AdmissionOutcome {
 /// on available cluster resources, which are configured in one or more resource pools. A
 /// request will either be admitted for immediate execution, queued for later execution,
 /// or rejected.  Resource pools can be configured to have maximum number of concurrent
-/// queries, maximum memory, and a maximum queue size. Queries will be queued if there
-/// are already too many queries executing or there isn't enough available memory. Once
-/// the queue reaches the maximum queue size, incoming queries will be rejected. Requests
-/// in the queue will time out after a configurable timeout.
+/// queries, maximum cluster wide memory, maximum queue size, max and min per host memory
+/// limit for every query, and to set whether the mem_limit query option will be clamped
+/// by the previously mentioned max/min per host limits or not. Queries will be queued if
+/// there are already too many queries executing or there isn't enough available memory.
+/// Once the queue reaches the maximum queue size, incoming queries will be rejected.
+/// Requests in the queue will time out after a configurable timeout.
 ///
 /// Any impalad can act as a coordinator and thus also an admission controller, so some
 /// cluster state must be shared between impalads in order to make admission decisions on
@@ -88,17 +90,27 @@ enum class AdmissionOutcome {
 ///
 /// The memory required for admission for a request is specified as the query option
 /// MEM_LIMIT (either explicitly or via a default value). This is a per-node value. If
-/// there is no memory limit, the per-node estimate from planning is used instead. The
-/// following two conditions must hold in order for the request to be admitted:
-///  1) There must be enough memory resources available in this resource pool for the
+/// there is no memory limit, the per-node estimate from planning is used instead as a
+/// memory limit and a lower bound is enforced on it based on the largest initial
+/// reservation of the query. The final memory limit used is also clamped by the max/min
+/// memory limits configured for the pool with an option to not enforce these limits on
+/// the MEM_LIMIT query option (If both these max/min limits are not configured, then the
+/// estimates from planning are not used as a memory limit and only used for making
+/// admission decisions. Moreover the estimates will no longer have a lower bound based on
+/// the largest initial reservation).
+/// The following three conditions must hold in order for the request to be admitted:
+///  1) The current pool configuration is valid.
+///  2) There must be enough memory resources available in this resource pool for the
 ///     request. The max memory resources configured for the resource pool specifies the
 ///     aggregate, cluster-wide memory that may be reserved by all executing queries in
 ///     this pool. Thus the aggregate memory to be reserved across all participating
 ///     backends for this request, *plus* that of already admitted requests must be less
 ///     than or equal to the max resources specified.
-///  2) All participating backends must have enough memory available. Each impalad has a
+///  3) All participating backends must have enough memory available. Each impalad has a
 ///     per-process mem limit, and that is the max memory that can be reserved on that
 ///     backend.
+///  4) The final per host memory limit used can accommodate the largest Initial
+///     reservation.
 ///
 /// In order to admit based on these conditions, the admission controller accounts for
 /// the following on both a per-host and per-pool basis:
@@ -117,7 +129,7 @@ enum class AdmissionOutcome {
 ///  b) Mem Admitted: the amount of memory required (i.e. the value used in admission,
 ///     either the mem limit or estimate) for the requests that this impalad's admission
 ///     controller has admitted. Both the per-pool and per-host accounting is updated
-///     when requests are admitted and released (and note: not via the statestore, so
+///     when requests are admitted and released (and NOTE: not via the statestore, so
 ///     there is no latency, but this does not account for memory from requests admitted
 ///     by other impalads).
 ///
@@ -132,42 +144,46 @@ enum class AdmissionOutcome {
 ///
 /// Example:
 /// Consider a 10-node cluster with 100gb/node and a resource pool 'q1' configured with
-/// 500gb of memory. An incoming request with a 40gb MEM_LIMIT and schedule to execute on
-/// all backends is received by AdmitQuery() on an otherwise quiet cluster.
-/// CanAdmitRequest() checks the number of running queries and then calls
+/// 500gb of aggregate memory and 40gb as the max memory limit. An incoming request with
+/// the MEM_LIMIT query option set to 50gb and scheduled to execute on all backends is
+/// received by AdmitQuery() on an otherwise quiet cluster. Based on the pool
+/// configuration, a per host mem limit of 40gb is used for this query and for any
+/// subsequent checks that it needs to pass prior admission. CanAdmitRequest() checks for
+/// a valid pool config and the number of running queries and then calls
 /// HasAvailableMemResources() to check for memory resources. It first checks whether
 /// there is enough memory for the request using PoolStats::EffectiveMemReserved() (which
 /// is the max of the pool's agg_mem_reserved_ and local_mem_admitted_, see #1 above),
-/// and then checks for enough memory on each individual host via the max of the values
-/// in the host_mem_reserved_ and host_mem_admitted_ maps (see #2 above). In this case,
-/// ample resources are available so CanAdmitRequest() returns true.  PoolStats::Admit()
-/// is called to update q1's PoolStats: it first updates agg_num_running_ and
-/// local_mem_admitted_ which are able to be used immediately for incoming admission
-/// requests, then it updates num_admitted_running in the struct sent to the statestore
-/// (local_stats_). UpdateHostMemAdmitted() is called to update the per-host admitted mem
-/// (stored in the map host_mem_admitted_) for all participating hosts. Then AdmitQuery()
-/// returns to the Scheduler. If another identical admission request is received by the
-/// same coordinator immediately, it will be rejected because q1's local_mem_admitted_ is
-/// already 400gb. If that request were sent to another impalad at the same time, it
-/// would have been admitted because not all updates have been disseminated yet. The next
-/// statestore update will contain the updated value of num_admitted_running for q1 on
-/// this backend. As remote fragments begin execution on remote impalads, their pool mem
-/// trackers will reflect the updated amount of memory reserved (set in
-/// local_stats_.backend_mem_reserved by UpdateMemTrackerStats()) and the next statestore
-/// updates coming from those impalads will send the updated value.  As the statestore
-/// updates are received (in the subscriber callback fn UpdatePoolStats()), the incoming
-/// per-backend, per-pool mem_reserved values are aggregated to
-/// PoolStats::agg_mem_reserved_ (pool aggregate over all hosts) and
-/// backend_mem_reserved_ (per-host aggregates over all pools). Once this has happened,
-/// any incoming admission request now has the updated state required to make correct
-/// admission decisions.
+/// then checks for enough memory on each individual host via the max of the values in the
+/// host_mem_reserved_ and host_mem_admitted_ maps (see #2 above) and finally checks if
+/// the memory limit used for this query can accommodate its largest initial reservation.
+/// In this case, ample resources are available so CanAdmitRequest() returns true.
+/// PoolStats::Admit() is called to update q1's PoolStats: it first updates
+/// agg_num_running_ and local_mem_admitted_ which are able to be used immediately for
+/// incoming admission requests, then it updates num_admitted_running in the struct sent
+/// to the statestore (local_stats_). UpdateHostMemAdmitted() is called to update the
+/// per-host admitted mem (stored in the map host_mem_admitted_) for all participating
+/// hosts. Then AdmitQuery() returns to the Scheduler. If another identical admission
+/// request is received by the same coordinator immediately, it will be rejected because
+/// q1's local_mem_admitted_ is already 400gb. If that request were sent to another
+/// impalad at the same time, it would have been admitted because not all updates have
+/// been disseminated yet. The next statestore update will contain the updated value of
+/// num_admitted_running for q1 on this backend. As remote fragments begin execution on
+/// remote impalads, their pool mem trackers will reflect the updated amount of memory
+/// reserved (set in local_stats_.backend_mem_reserved by UpdateMemTrackerStats()) and the
+/// next statestore updates coming from those impalads will send the updated value. As
+/// the statestore updates are received (in the subscriber callback fn UpdatePoolStats()),
+/// the incoming per-backend, per-pool mem_reserved values are aggregated to
+/// PoolStats::agg_mem_reserved_ (pool aggregate over all hosts) and backend_mem_reserved_
+/// (per-host aggregates over all pools). Once this has happened, any incoming admission
+/// request now has the updated state required to make correct admission decisions.
 ///
 /// Queuing Behavior:
 /// Once the resources in a pool are consumed each coordinator receiving requests will
 /// begin queuing. While each individual queue is FIFO, there is no total ordering on the
 /// queued requests between admission controllers and no FIFO behavior is guaranteed for
 /// requests submitted to different coordinators. When resources become available, there
-/// is no synchronous coordination between nodes used to determine which get to dequeue and
+/// is no synchronous coordination between nodes used to determine which get to dequeue
+/// and
 /// admit requests. Instead, we use a simple heuristic to try to dequeue a number of
 /// requests proportional to the number of requests that are waiting in each individual
 /// admission controller to the total number of requests queued across all admission
@@ -186,10 +202,16 @@ enum class AdmissionOutcome {
 /// proactively cancelled by setting the 'admit_outcome' to AdmissionOutcome::CANCELLED.
 /// This is handled asynchronously by AdmitQuery() and DequeueLoop().
 ///
-/// TODO: Improve the dequeuing policy. IMPALA-2968.
+/// Pool Configuration Mechanism:
+/// The path to pool config files are specified using the startup flags
+/// "fair_scheduler_allocation_path" and "llama_site_path". The format for specifying pool
+/// configs is based on yarn and llama with additions specific to Impala. A file
+/// monitoring service is started that monitors changes made to these files. Those changes
+/// are only propagated to Impala when a new query is serviced. See RequestPoolService
+/// class for more details.
 ///
-/// TODO: Remove less important debug logging after more cluster testing. Should have a
-///       better idea of what is perhaps unnecessary.
+/// TODO: Improve the dequeuing policy. IMPALA-2968.
+
 class AdmissionController {
  public:
   AdmissionController(StatestoreSubscriber* subscriber,
@@ -206,7 +228,7 @@ class AdmissionController {
   /// - Cancelled: <CANCELLED, Status::CANCELLED>
   /// If admitted, ReleaseQuery() should also be called after the query completes or gets
   /// cancelled to ensure that the pool statistics are updated.
-  Status AdmitQuery(QuerySchedule* schedule,
+  Status SubmitForAdmission(QuerySchedule* schedule,
       Promise<AdmissionOutcome, PromiseMode::MULTIPLE_PRODUCER>* admit_outcome);
 
   /// Updates the pool statistics when a query completes (either successfully,
@@ -246,10 +268,16 @@ class AdmissionController {
   boost::mutex admission_ctrl_lock_;
 
   /// Maps from host id to memory reserved and memory admitted, both aggregates over all
-  /// pools. See the class doc for a definition of reserved and admitted. Protected by
-  /// admission_ctrl_lock_.
+  /// pools. See the class doc for a detailed definition of reserved and admitted.
+  /// Protected by admission_ctrl_lock_.
   typedef boost::unordered_map<std::string, int64_t> HostMemMap;
+  /// The mem reserved for a query that is currently executing is its memory limit, if set
+  /// (which should be the common case with admission control). Otherwise, if the query
+  /// has no limit or the query is finished executing, the current consumption (tracked
+  /// by its query mem tracker) is used.
   HostMemMap host_mem_reserved_;
+
+  /// The per host mem admitted only for the queries admitted locally.
   HostMemMap host_mem_admitted_;
 
   /// Contains all per-pool statistics and metrics. Accessed via GetPoolStats().
@@ -284,6 +312,9 @@ class AdmissionController {
       IntGauge* pool_max_mem_resources;
       IntGauge* pool_max_requests;
       IntGauge* pool_max_queued;
+      IntGauge* max_query_mem_limit;
+      IntGauge* min_query_mem_limit;
+      BooleanProperty* clamp_mem_limit_query_option;
     };
 
     PoolStats(AdmissionController* parent, const std::string& name)
@@ -390,13 +421,13 @@ class AdmissionController {
   /// during the call to AdmitQuery() but its members live past that and are owned by the
   /// ClientRequestState object associated with them.
   struct QueueNode : public InternalQueue<QueueNode>::Node {
-    QueueNode(const QuerySchedule& query_schedule,
+    QueueNode(QuerySchedule* query_schedule,
         Promise<AdmissionOutcome, PromiseMode::MULTIPLE_PRODUCER>* admission_outcome,
         RuntimeProfile* profile)
       : schedule(query_schedule), admit_outcome(admission_outcome), profile(profile) {}
 
     /// The query schedule of the queued request.
-    const QuerySchedule& schedule;
+    QuerySchedule* const schedule;
 
     /// The Admission outcome of the queued request.
     Promise<AdmissionOutcome, PromiseMode::MULTIPLE_PRODUCER>* const admit_outcome;
@@ -461,6 +492,19 @@ class AdmissionController {
   bool CanAdmitRequest(const QuerySchedule& schedule, const TPoolConfig& pool_cfg,
       bool admit_from_queue, std::string* not_admitted_reason);
 
+  /// Returns true if the per host mem limit for the query represented by 'schedule' is
+  /// large enough to accommodate the largest initial reservation required. Otherwise,
+  /// returns false with the details about the memory shortage in
+  /// 'mem_unavailable_reason'. Possible cases where it can return false are:
+  /// 1. The pool.max_query_mem_limit is set too low
+  /// 2. mem_limit in query options is set low and no max/min_query_mem_limit is set in
+  ///    the pool configuration.
+  /// 3. mem_limit in query options is set low and min_query_mem_limit is also set low.
+  /// 4. mem_limit in query options is set low and the pool.min_query_mem_limit is set
+  ///    to a higher value but pool.clamp_mem_limit_query_option is false.
+  bool CanAccommodateMaxInitialReservation(const QuerySchedule& schedule,
+      const TPoolConfig& pool_cfg, string* mem_unavailable_reason);
+
   /// Returns true if there is enough memory available to admit the query based on the
   /// schedule, the aggregate pool memory, and the per-host memory. If not, this returns
   /// false and returns the reason in mem_unavailable_reason. Caller owns
@@ -485,6 +529,15 @@ class AdmissionController {
   /// Log the reason for dequeueing of 'node' failing and add the reason to the query's
   /// profile. Must hold admission_ctrl_lock_.
   void LogDequeueFailed(QueueNode* node, const std::string& not_admitted_reason);
+
+  /// Returns false if pool config is invalid and populates the 'reason' with the reason
+  /// behind invalidity.
+  bool IsPoolConfigValid(const TPoolConfig& pool_cfg, std::string* reason);
+
+  // Sets the per host mem limit and mem admitted in the schedule and does the necessary
+  // accounting and logging on successful submission.
+  // Caller must hold 'admission_ctrl_lock_'.
+  void AdmitQuery(QuerySchedule* schedule, bool was_queued);
 };
 
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/be/src/scheduling/query-schedule.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-schedule.cc b/be/src/scheduling/query-schedule.cc
index 2cb208e..365c981 100644
--- a/be/src/scheduling/query-schedule.cc
+++ b/be/src/scheduling/query-schedule.cc
@@ -22,6 +22,7 @@
 #include <boost/uuid/uuid.hpp>
 #include <boost/uuid/uuid_generators.hpp>
 
+#include "runtime/bufferpool/reservation-util.h"
 #include "util/container-util.h"
 #include "util/mem-info.h"
 #include "util/network-util.h"
@@ -165,37 +166,9 @@ void QuerySchedule::Validate() const {
   // TODO: add validation for BackendExecParams
 }
 
-int64_t QuerySchedule::GetClusterMemoryEstimate() const {
-  DCHECK_GT(per_backend_exec_params_.size(), 0);
-  const int64_t total_cluster_mem =
-      GetPerHostMemoryEstimate() * per_backend_exec_params_.size();
-  DCHECK_GE(total_cluster_mem, 0); // Assume total cluster memory fits in an int64_t.
-  return total_cluster_mem;
-}
-
 int64_t QuerySchedule::GetPerHostMemoryEstimate() const {
-  // Precedence of different estimate sources is:
-  // user-supplied RM query option >
-  //     query option limit >
-  //       estimate >
-  //         server-side defaults
-  int64_t query_option_memory_limit = numeric_limits<int64_t>::max();
-  bool has_query_option = false;
-  if (query_options_.__isset.mem_limit && query_options_.mem_limit > 0) {
-    query_option_memory_limit = query_options_.mem_limit;
-    has_query_option = true;
-  }
-
-  int64_t per_host_mem = 0L;
-  if (has_query_option) {
-    per_host_mem = query_option_memory_limit;
-  } else {
-    DCHECK(request_.__isset.per_host_mem_estimate);
-    per_host_mem = request_.per_host_mem_estimate;
-  }
-  // Cap the memory estimate at the amount of physical memory available. The user's
-  // provided value or the estimate from planning can each be unreasonable.
-  return min(per_host_mem, MemInfo::physical_mem());
+  DCHECK(request_.__isset.per_host_mem_estimate);
+  return request_.per_host_mem_estimate;
 }
 
 TUniqueId QuerySchedule::GetNextInstanceId() {
@@ -250,4 +223,56 @@ int QuerySchedule::GetNumFragmentInstances() const {
   return total;
 }
 
+int64_t QuerySchedule::GetClusterMemoryToAdmit() const {
+  return per_backend_mem_to_admit() *  per_backend_exec_params_.size();
+}
+
+void QuerySchedule::UpdateMemoryRequirements(const TPoolConfig& pool_cfg) {
+  // If the min_query_mem_limit and max_query_mem_limit are not set in the pool config
+  // then it falls back to traditional(old) behavior, which means that, if for_admission
+  // is false, it returns the mem_limit if it is set in the query options, else returns -1
+  // which means no limit; if for_admission is true, it returns the mem_limit if it is set
+  // in the query options, else returns the per host mem estimate calculated during
+  // planning.
+  bool mimic_old_behaviour =
+      pool_cfg.min_query_mem_limit == 0 && pool_cfg.max_query_mem_limit == 0;
+
+  per_backend_mem_to_admit_ = 0;
+  bool has_query_option = false;
+  if (query_options().__isset.mem_limit && query_options().mem_limit > 0) {
+    per_backend_mem_to_admit_ = query_options().mem_limit;
+    has_query_option = true;
+  }
+
+  if (!has_query_option) {
+    per_backend_mem_to_admit_ = GetPerHostMemoryEstimate();
+    if (!mimic_old_behaviour) {
+      int64_t min_mem_limit_required = ReservationUtil::GetMinMemLimitFromReservation(
+          largest_min_reservation());
+      per_backend_mem_to_admit_ = max(per_backend_mem_to_admit_, min_mem_limit_required);
+    }
+  }
+
+  if (!has_query_option || pool_cfg.clamp_mem_limit_query_option) {
+    if (pool_cfg.min_query_mem_limit > 0) {
+      per_backend_mem_to_admit_ =
+          max(per_backend_mem_to_admit_, pool_cfg.min_query_mem_limit);
+    }
+    if (pool_cfg.max_query_mem_limit > 0) {
+      per_backend_mem_to_admit_ =
+          min(per_backend_mem_to_admit_, pool_cfg.max_query_mem_limit);
+    }
+  }
+
+  // Cap the memory estimate at the amount of physical memory available. The user's
+  // provided value or the estimate from planning can each be unreasonable.
+  per_backend_mem_to_admit_ = min(per_backend_mem_to_admit_, MemInfo::physical_mem());
+
+  if (mimic_old_behaviour && !has_query_option) {
+    per_backend_mem_limit_ = -1;
+  } else {
+    per_backend_mem_limit_ = per_backend_mem_to_admit_;
+  }
+}
+
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/be/src/scheduling/query-schedule.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-schedule.h b/be/src/scheduling/query-schedule.h
index b43cc7b..190caee 100644
--- a/be/src/scheduling/query-schedule.h
+++ b/be/src/scheduling/query-schedule.h
@@ -137,8 +137,15 @@ struct FragmentExecParams {
 /// and the granted resource reservation.
 ///
 /// QuerySchedule is a container class for scheduling data, but it doesn't contain
-/// scheduling logic itself. Its state either comes from the static TQueryExecRequest
-/// or is computed by Scheduler.
+/// scheduling logic itself.
+/// The general usage pattern is that part of its state gets set from the static
+/// TQueryExecRequest during initialization, then the actual schedule gets set by the
+/// scheduler, then finally it is passed to the admission controller that keeps updating
+/// the memory requirements by calling UpdateMemoryRequirements() every time it tries to
+/// admit the query but only sets the final values once the query gets admitted
+/// successfully. Note: Due to this usage pattern the memory requirement values should not
+/// be accessed by other clients of this class while the query is in admission control
+/// phase.
 class QuerySchedule {
  public:
   QuerySchedule(const TUniqueId& query_id, const TQueryExecRequest& request,
@@ -157,13 +164,8 @@ class QuerySchedule {
   // Valid after Schedule() succeeds.
   const std::string& request_pool() const { return request().query_ctx.request_pool; }
 
-  /// Gets the estimated memory (bytes) per-node. Returns the user specified estimate
-  /// (MEM_LIMIT query parameter) if provided or the estimate from planning if available,
-  /// but is capped at the amount of physical memory to avoid problems if either estimate
-  /// is unreasonably large.
+  /// Returns the estimated memory (bytes) per-node from planning.
   int64_t GetPerHostMemoryEstimate() const;
-  /// Total estimated memory for all nodes. set_num_hosts() must be set before calling.
-  int64_t GetClusterMemoryEstimate() const;
 
   /// Helper methods used by scheduler to populate this QuerySchedule.
   void IncNumScanRanges(int64_t delta) { num_scan_ranges_ += delta; }
@@ -221,10 +223,35 @@ class QuerySchedule {
   RuntimeProfile* summary_profile() { return summary_profile_; }
   RuntimeProfile::EventSequence* query_events() { return query_events_; }
 
+  int64_t largest_min_reservation() const { return largest_min_reservation_; }
+
+  /// Must call UpdateMemoryRequirements() at least once before calling this.
+  int64_t per_backend_mem_limit() const { return per_backend_mem_limit_; }
+
+  /// Must call UpdateMemoryRequirements() at least once before calling this.
+  int64_t per_backend_mem_to_admit() const {
+    DCHECK_GT(per_backend_mem_to_admit_, 0);
+    return per_backend_mem_to_admit_;
+  }
+
   void set_per_backend_exec_params(const PerBackendExecParams& params) {
     per_backend_exec_params_ = params;
   }
 
+  void set_largest_min_reservation(const int64_t largest_min_reservation) {
+    largest_min_reservation_ = largest_min_reservation;
+  }
+
+  /// Returns the Cluster wide memory admitted by the admission controller.
+  /// Must call UpdateMemoryRequirements() at least once before calling this.
+  int64_t GetClusterMemoryToAdmit() const;
+
+  /// Populates or updates the per host query memory limit and the amount of memory to be
+  /// admitted based on the pool configuration passed to it. Must be called at least once
+  /// before making any calls to per_backend_mem_to_admit(), per_backend_mem_limit() and
+  /// GetClusterMemoryToAdmit().
+  void UpdateMemoryRequirements(const TPoolConfig& pool_cfg);
+
  private:
   /// These references are valid for the lifetime of this query schedule because they
   /// are all owned by the enclosing QueryExecState.
@@ -259,6 +286,20 @@ class QuerySchedule {
   /// Used to generate consecutive fragment instance ids.
   TUniqueId next_instance_id_;
 
+  /// The largest min memory reservation across all backends. Set in
+  /// Scheduler::Schedule().
+  int64_t largest_min_reservation_ = 0;
+
+  /// The memory limit per backend that will be imposed on the query.
+  /// Set by the admission controller with a value that is only valid if it was admitted
+  /// successfully. -1 means no limit.
+  int64_t per_backend_mem_limit_ = 0;
+
+  /// The per backend memory used for admission accounting.
+  /// Set by the admission controller with a value that is only valid if it was admitted
+  /// successfully.
+  int64_t per_backend_mem_to_admit_ = 0;
+
   /// Populate fragment_exec_params_ from request_.plan_exec_info.
   /// Sets is_coord_fragment and input_fragments.
   /// Also populates plan_node_to_fragment_idx_ and plan_node_to_plan_node_idx_.

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/be/src/scheduling/request-pool-service.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/request-pool-service.cc b/be/src/scheduling/request-pool-service.cc
index bad5ac1..f9fdf88 100644
--- a/be/src/scheduling/request-pool-service.cc
+++ b/be/src/scheduling/request-pool-service.cc
@@ -195,6 +195,9 @@ Status RequestPoolService::GetPoolConfig(const string& pool_name,
         FLAGS_disable_pool_mem_limits ? -1 : default_pool_mem_limit_);
     pool_config->__set_max_queued(FLAGS_default_pool_max_queued);
     pool_config->__set_default_query_options("");
+    pool_config->__set_min_query_mem_limit(0);
+    pool_config->__set_max_query_mem_limit(0);
+    pool_config->__set_clamp_mem_limit_query_option(true);
     return Status::OK();
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/be/src/scheduling/scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 43bf199..2d51d5c 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -787,12 +787,16 @@ void Scheduler::ComputeBackendExecParams(
     }
   }
 
+  int64_t largest_min_reservation = 0;
   for (auto& backend: per_backend_params) {
     const TNetworkAddress& host = backend.first;
     backend.second.proc_mem_limit =
         LookUpBackendDesc(executor_config, host).proc_mem_limit;
+    largest_min_reservation =
+        max(largest_min_reservation, backend.second.min_mem_reservation_bytes);
   }
   schedule->set_per_backend_exec_params(per_backend_params);
+  schedule->set_largest_min_reservation(largest_min_reservation);
 
   stringstream min_mem_reservation_ss;
   for (const auto& e: per_backend_params) {

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/be/src/service/client-request-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index c19c974..c71429a 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -506,8 +506,9 @@ void ClientRequestState::FinishExecQueryOrDmlRequest() {
   DebugActionNoFail(schedule_->query_options(), "CRS_BEFORE_ADMISSION");
 
   DCHECK(exec_env_->admission_controller() != nullptr);
-  Status admit_status = ExecEnv::GetInstance()->admission_controller()->AdmitQuery(
-      schedule_.get(), &admit_outcome_);
+  Status admit_status =
+      ExecEnv::GetInstance()->admission_controller()->SubmitForAdmission(
+          schedule_.get(), &admit_outcome_);
   {
     lock_guard<mutex> l(lock_);
     if (!UpdateQueryStatus(admit_status).ok()) return;

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index cf8233d..7222dae 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -546,6 +546,10 @@ struct TExecQueryFInstancesParams {
   // operators in all fragment instances that execute on this backend. This is used for
   // an optimization in InitialReservation. Measured in bytes. required in V1
   7: optional i64 initial_mem_reservation_total_claims
+
+  // The backend memory limit (in bytes) as set by the admission controller. Used by the
+  // query mem tracker to enforce the memory limit. required in V1
+  8: optional i64 per_backend_mem_limit
 }
 
 struct TExecQueryFInstancesResult {
@@ -773,6 +777,20 @@ struct TPoolConfig {
 
   // Default query options that are applied to requests mapped to this pool.
   5: required string default_query_options;
+
+  // Maximum amount of memory that can be assigned to a query (in bytes).
+  // 0 indicates no limit. If both max_query_mem_limit and min_query_mem_limit are zero
+  // then the admission controller will fall back on old behavior, which is to not set
+  // any backend mem limit if mem_limit is not set in the query options.
+  6: required i64 max_query_mem_limit = 0;
+
+  // Minimum amount of memory that can be assigned to a query (in bytes).
+  // 0 indicates no limit.
+  7: required i64 min_query_mem_limit = 0;
+
+  // If false, the mem_limit query option will not be bounded by the max/min query mem
+  // limits specified for the pool. Default is true.
+  8: required bool clamp_mem_limit_query_option = true;
 }
 
 struct TBloomFilter {

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/common/thrift/metrics.json
----------------------------------------------------------------------
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index c40716a..72fab16 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -50,6 +50,36 @@
     "key": "admission-controller.pool-max-queued.$0"
   },
   {
+    "description": "Resource Pool $0 Max Query Memory Limit",
+    "contexts": [
+      "RESOURCE_POOL"
+    ],
+    "label": "Resource Pool $0 Max Query Memory Limit",
+    "units": "BYTES",
+    "kind": "GAUGE",
+    "key": "admission-controller.pool-max-query-mem-limit.$0"
+  },
+  {
+    "description": "Resource Pool $0 Min Query Memory Limit",
+    "contexts": [
+      "RESOURCE_POOL"
+    ],
+    "label": "Resource Pool $0 Min Query Memory Limit",
+    "units": "BYTES",
+    "kind": "GAUGE",
+    "key": "admission-controller.pool-min-query-mem-limit.$0"
+  },
+  {
+    "description": "If false, the mem_limit query option will not be bounded by the max/min query mem limits specified for the pool",
+    "contexts": [
+      "RESOURCE_POOL"
+    ],
+    "label": "Resource Pool $0 Clamp 'MEM_LIMIT' Query Option Flag",
+    "units": "NONE",
+    "kind": "PROPERTY",
+    "key": "admission-controller.pool-clamp-mem-limit-query-option.$0"
+  },
+  {
     "description": "Resource Pool $0 Aggregate Queue Size",
     "contexts": [
       "RESOURCE_POOL"

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/fe/src/main/java/org/apache/impala/util/RequestPoolService.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/util/RequestPoolService.java b/fe/src/main/java/org/apache/impala/util/RequestPoolService.java
index a104399..af9fe7f 100644
--- a/fe/src/main/java/org/apache/impala/util/RequestPoolService.java
+++ b/fe/src/main/java/org/apache/impala/util/RequestPoolService.java
@@ -50,6 +50,7 @@ import org.apache.impala.util.FileWatchService.FileChangeListener;
 import org.apache.impala.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration;
 import org.apache.impala.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService;
 import org.apache.impala.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
@@ -114,6 +115,19 @@ public class RequestPoolService {
   // use this.
   final static String QUERY_OPTIONS_KEY = "impala.admission-control.pool-default-query-options";
 
+  // Keys for the pool max and min query mem limits (in bytes) respectively. This is
+  // specified in the llama-site.xml but is Impala-specific and Llama does not use this.
+  final static String MAX_QUERY_MEM_LIMIT_BYTES =
+      "impala.admission-control.max-query-mem-limit";
+  final static String MIN_QUERY_MEM_LIMIT_BYTES =
+      "impala.admission-control.min-query-mem-limit";
+
+  // Key for specifying if the mem_limit query option can override max/min mem limits
+  // of the pool. This is specified in the llama-site.xml but is Impala-specific and
+  // Llama does not use this.
+  final static String CLAMP_MEM_LIMIT_QUERY_OPTION =
+      "impala.admission-control.clamp-mem-limit-query-option";
+
   // String format for a per-pool configuration key. First parameter is the key for the
   // default, e.g. LLAMA_MAX_PLACED_RESERVATIONS_KEY, and the second parameter is the
   // pool name.
@@ -369,11 +383,17 @@ public class RequestPoolService {
           LLAMA_MAX_QUEUED_RESERVATIONS_DEFAULT));
 
       // Only return positive values. Admission control has a default from gflags.
-      int queueTimeoutMs = getLlamaPoolConfigValue(currentLlamaConf, pool,
-          QUEUE_TIMEOUT_KEY, -1);
+      long queueTimeoutMs = getLlamaPoolConfigValue(currentLlamaConf, pool,
+          QUEUE_TIMEOUT_KEY, -1L);
       if (queueTimeoutMs > 0) result.setQueue_timeout_ms(queueTimeoutMs);
       result.setDefault_query_options(getLlamaPoolConfigValue(currentLlamaConf, pool,
           QUERY_OPTIONS_KEY, ""));
+      result.setMax_query_mem_limit(getLlamaPoolConfigValue(currentLlamaConf, pool,
+          MAX_QUERY_MEM_LIMIT_BYTES, 0L));
+      result.setMin_query_mem_limit(getLlamaPoolConfigValue(currentLlamaConf, pool,
+          MIN_QUERY_MEM_LIMIT_BYTES, 0L));
+      result.setClamp_mem_limit_query_option(getLlamaPoolConfigValue(currentLlamaConf,
+          pool, CLAMP_MEM_LIMIT_QUERY_OPTION, true));
     }
     if (LOG.isTraceEnabled()) {
       LOG.debug("getPoolConfig(pool={}): max_mem_resources={}, max_requests={}, " +
@@ -392,10 +412,10 @@ public class RequestPoolService {
    * @param conf The Configuration to use, provided so the caller can ensure the same
    *        Configuration is used to look up multiple properties.
    */
-  private int getLlamaPoolConfigValue(Configuration conf, String pool, String key,
-      int defaultValue) {
-    return conf.getInt(String.format(LLAMA_PER_POOL_CONFIG_KEY_FORMAT, key, pool),
-        conf.getInt(key, defaultValue));
+  private long getLlamaPoolConfigValue(Configuration conf, String pool, String key,
+      long defaultValue) {
+    return conf.getLong(String.format(LLAMA_PER_POOL_CONFIG_KEY_FORMAT, key, pool),
+        conf.getLong(key, defaultValue));
   }
 
   /**
@@ -408,6 +428,15 @@ public class RequestPoolService {
   }
 
   /**
+   * Looks up the per-pool Boolean config from the llama Configuration. See above.
+   */
+  private boolean getLlamaPoolConfigValue(Configuration conf, String pool, String key,
+      boolean defaultValue) {
+    return conf.getBoolean(String.format(LLAMA_PER_POOL_CONFIG_KEY_FORMAT, key, pool),
+        conf.getBoolean(key, defaultValue));
+  }
+
+  /**
    * Resolves the actual pool to use via the allocation placement policy. The policy may
    * change the requested pool.
    *

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/fe/src/test/java/org/apache/impala/util/TestRequestPoolService.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/util/TestRequestPoolService.java b/fe/src/test/java/org/apache/impala/util/TestRequestPoolService.java
index 622c7a9..a66d2dc 100644
--- a/fe/src/test/java/org/apache/impala/util/TestRequestPoolService.java
+++ b/fe/src/test/java/org/apache/impala/util/TestRequestPoolService.java
@@ -152,7 +152,7 @@ public class TestRequestPoolService {
   public void testPoolResolution() throws Exception {
     createPoolService(ALLOCATION_FILE, LLAMA_CONFIG_FILE);
     Assert.assertEquals("root.queueA", poolService_.assignToPool("root.queueA", "userA"));
-    Assert.assertNull(poolService_.assignToPool("queueC", "userA"));
+    Assert.assertNull(poolService_.assignToPool("queueD", "userA"));
   }
 
   @Test
@@ -204,6 +204,8 @@ public class TestRequestPoolService {
     checkPoolConfigResult("root.queueA", 10, 30, 1024 * ByteUnits.MEGABYTE,
         10000L, "mem_limit=1024m,query_timeout_s=10");
     checkPoolConfigResult("root.queueB", 5, 10, -1, 30000L, "mem_limit=1024m");
+    checkPoolConfigResult("root.queueC", 5, 10, 1024 * ByteUnits.MEGABYTE, 30000L,
+        "mem_limit=1024m", 1000, 10, false);
   }
 
   @Test
@@ -211,7 +213,7 @@ public class TestRequestPoolService {
     createPoolService(ALLOCATION_FILE_EMPTY, LLAMA_CONFIG_FILE_EMPTY);
     Assert.assertEquals("root.userA", poolService_.assignToPool("", "userA"));
     Assert.assertTrue(poolService_.hasAccess("root.userA", "userA"));
-    checkPoolConfigResult("root", -1, 200, -1);
+    checkPoolConfigResult("root", -1, 200, -1, null, "", 0 ,0, true);
   }
 
   @Ignore("IMPALA-4868") @Test
@@ -306,11 +308,15 @@ public class TestRequestPoolService {
    */
   private void checkPoolConfigResult(String pool, long expectedMaxRequests,
       long expectedMaxQueued, long expectedMaxMem, Long expectedQueueTimeoutMs,
-      String expectedQueryOptions) {
+      String expectedQueryOptions, long max_query_mem_limit, long min_query_mem_limit,
+      boolean clamp_mem_limit_query_option) {
     TPoolConfig expectedResult = new TPoolConfig();
     expectedResult.setMax_requests(expectedMaxRequests);
     expectedResult.setMax_queued(expectedMaxQueued);
     expectedResult.setMax_mem_resources(expectedMaxMem);
+    expectedResult.setMax_query_mem_limit(max_query_mem_limit);
+    expectedResult.setMin_query_mem_limit(min_query_mem_limit);
+    expectedResult.setClamp_mem_limit_query_option(clamp_mem_limit_query_option);
     if (expectedQueueTimeoutMs != null) {
       expectedResult.setQueue_timeout_ms(expectedQueueTimeoutMs);
     }
@@ -322,6 +328,13 @@ public class TestRequestPoolService {
   }
 
   private void checkPoolConfigResult(String pool, long expectedMaxRequests,
+      long expectedMaxQueued, long expectedMaxMem, Long expectedQueueTimeoutMs,
+      String expectedQueryOptions) {
+    checkPoolConfigResult(pool, expectedMaxRequests, expectedMaxQueued,
+        expectedMaxMem, expectedQueueTimeoutMs, expectedQueryOptions, 0, 0, true);
+  }
+
+  private void checkPoolConfigResult(String pool, long expectedMaxRequests,
       long expectedMaxQueued, long expectedMaxMemUsage) {
     checkPoolConfigResult(pool, expectedMaxRequests, expectedMaxQueued,
         expectedMaxMemUsage, null, "");

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/fe/src/test/resources/fair-scheduler-test.xml
----------------------------------------------------------------------
diff --git a/fe/src/test/resources/fair-scheduler-test.xml b/fe/src/test/resources/fair-scheduler-test.xml
index f8536bf..9d3dafd 100644
--- a/fe/src/test/resources/fair-scheduler-test.xml
+++ b/fe/src/test/resources/fair-scheduler-test.xml
@@ -8,6 +8,10 @@
     <queue name="queueB">
       <aclSubmitApps>userB root</aclSubmitApps>
     </queue>
+    <queue name="queueC">
+      <aclSubmitApps>* </aclSubmitApps>
+      <maxResources>1024 mb, 0 vcores</maxResources>
+    </queue>
     <aclSubmitApps> </aclSubmitApps>
   </queue>
   <queuePlacementPolicy>

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/fe/src/test/resources/llama-site-test.xml
----------------------------------------------------------------------
diff --git a/fe/src/test/resources/llama-site-test.xml b/fe/src/test/resources/llama-site-test.xml
index e66da83..3738705 100644
--- a/fe/src/test/resources/llama-site-test.xml
+++ b/fe/src/test/resources/llama-site-test.xml
@@ -43,4 +43,16 @@
     <name>impala.admission-control.pool-default-query-options.root.queueA</name>
     <value>mem_limit=1024m,query_timeout_s=10</value>
   </property>
+  <property>
+    <name>impala.admission-control.max-query-mem-limit.root.queueC</name>
+    <value>1000</value>
+  </property>
+  <property>
+    <name>impala.admission-control.min-query-mem-limit.root.queueC</name>
+    <value>10</value>
+  </property>
+  <property>
+    <name>impala.admission-control.clamp-mem-limit-query-option.root.queueC</name>
+    <value>false</value>
+  </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/fe/src/test/resources/mem-limit-test-fair-scheduler.xml
----------------------------------------------------------------------
diff --git a/fe/src/test/resources/mem-limit-test-fair-scheduler.xml b/fe/src/test/resources/mem-limit-test-fair-scheduler.xml
new file mode 100644
index 0000000..3471642
--- /dev/null
+++ b/fe/src/test/resources/mem-limit-test-fair-scheduler.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0"?>
+<allocations>
+  <queue name="root">
+    <queue name="regularPoolWithoutClamping">
+      <aclSubmitApps>*</aclSubmitApps>
+      <maxResources>2500 mb, 0 vcores</maxResources>
+    </queue>
+    <queue name="poolLowMinLimit">
+      <aclSubmitApps>*</aclSubmitApps>
+      <maxResources>2500 mb, 0 vcores</maxResources>
+    </queue>
+    <queue name="poolLowMaxLimit">
+      <aclSubmitApps>*</aclSubmitApps>
+      <maxResources>2500 mb, 0 vcores</maxResources>
+    </queue>
+    <queue name="regularPool">
+      <aclSubmitApps>*</aclSubmitApps>
+      <maxResources>2500 mb, 0 vcores</maxResources>
+    </queue>
+    <queue name="regularPoolNoMinLimit">
+      <aclSubmitApps>*</aclSubmitApps>
+      <maxResources>2500 mb, 0 vcores</maxResources>
+    </queue>
+    <queue name="poolNoMemLimits">
+      <aclSubmitApps>*</aclSubmitApps>
+      <maxResources>2500 mb, 0 vcores</maxResources>
+    </queue>
+    <queue name="maxLessThanMinLimit">
+      <aclSubmitApps>*</aclSubmitApps>
+      <maxResources>2500 mb, 0 vcores</maxResources>
+    </queue>
+    <queue name="maxMemLessThanMinLimit">
+      <aclSubmitApps>*</aclSubmitApps>
+      <maxResources>2500 mb, 0 vcores</maxResources>
+    </queue>
+    <queue name="invalidTestPool">
+      <aclSubmitApps>*</aclSubmitApps>
+      <maxResources>2500 mb, 0 vcores</maxResources>
+    </queue>
+    <aclSubmitApps> </aclSubmitApps>
+  </queue>
+  <queuePlacementPolicy>
+    <rule name="specified"/>
+    <rule name="default" queue="root.regularPool"/>
+  </queuePlacementPolicy>
+</allocations>

http://git-wip-us.apache.org/repos/asf/impala/blob/fc91e706/fe/src/test/resources/mem-limit-test-llama-site.xml
----------------------------------------------------------------------
diff --git a/fe/src/test/resources/mem-limit-test-llama-site.xml b/fe/src/test/resources/mem-limit-test-llama-site.xml
new file mode 100644
index 0000000..2c48388
--- /dev/null
+++ b/fe/src/test/resources/mem-limit-test-llama-site.xml
@@ -0,0 +1,88 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration>
+  <!--regularPoolWithoutClamping pool config-->
+  <property>
+    <name>impala.admission-control.max-query-mem-limit.root.regularPoolWithoutClamping</name>
+    <value>1610612736</value><!--1.5GB-->
+  </property>
+  <property>
+    <name>impala.admission-control.min-query-mem-limit.root.regularPoolWithoutClamping</name>
+    <value>104857600</value><!--100MB-->
+  </property>
+  <property>
+    <name>impala.admission-control.clamp-mem-limit-query-option.root.regularPoolWithoutClamping</name>
+    <value>false</value>
+  </property>
+  <!--poolLowMinLimit pool config-->
+  <property>
+    <name>impala.admission-control.min-query-mem-limit.root.poolLowMinLimit</name>
+    <value>26214400</value><!--25MB-->
+  </property>
+  <!--poolLowMaxLimit pool config-->
+  <property>
+    <name>impala.admission-control.max-query-mem-limit.root.poolLowMaxLimit</name>
+    <value>26214400</value><!--25MB-->
+  </property>
+  <!--regularPool pool config-->
+  <property>
+    <name>impala.admission-control.max-query-mem-limit.root.regularPool</name>
+    <value>1610612736</value><!--1.5GB-->
+  </property>
+  <property>
+    <name>impala.admission-control.min-query-mem-limit.root.regularPool</name>
+    <value>52428800</value><!--50MB-->
+  </property>
+  <property>
+    <name>impala.admission-control.clamp-mem-limit-query-option.root.regularPool</name>
+    <value>true</value>
+  </property>
+  <!--regularPoolNoMinLimit pool config-->
+  <property>
+    <name>impala.admission-control.max-query-mem-limit.root.regularPoolNoMinLimit</name>
+    <value>1610612736</value><!--1.5GB-->
+  </property>
+  <property>
+    <name>impala.admission-control.min-query-mem-limit.root.regularPoolNoMinLimit</name>
+    <value>0</value>
+  </property>
+  <property>
+    <name>impala.admission-control.clamp-mem-limit-query-option.root.regularPoolNoMinLimit</name>
+    <value>true</value>
+  </property>
+  <!--poolNoMemLimits pool config-->
+  <property>
+    <name>impala.admission-control.max-query-mem-limit.root.poolNoMemLimits</name>
+    <value>0</value>
+  </property>
+  <property>
+    <name>impala.admission-control.min-query-mem-limit.root.poolNoMemLimits</name>
+    <value>0</value>
+  </property>
+  <!--maxLessThanMinLimit pool config-->
+  <property>
+    <name>impala.admission-control.max-query-mem-limit.root.maxLessThanMinLimit</name>
+    <value>100000</value>
+  </property>
+  <property>
+    <name>impala.admission-control.min-query-mem-limit.root.maxLessThanMinLimit</name>
+    <value>100001</value>
+  </property>
+  <!--maxMemLessThanMinLimit pool config-->
+  <property>
+    <name>impala.admission-control.min-query-mem-limit.root.maxMemLessThanMinLimit</name>
+    <value>2621440001</value><!--2500MB + 1B-->
+  </property>
+  <!--invalidTestPool pool config-->
+  <property>
+    <name>impala.admission-control.max-query-mem-limit.root.invalidTestPool</name>
+    <value>0</value>
+  </property>
+  <property>
+    <name>impala.admission-control.min-query-mem-limit.root.invalidTestPool</name>
+    <value>26214400</value>
+  </property>
+  <property>
+    <name>llama.am.throttling.maximum.placed.reservations.root.invalidTestPool</name>
+    <value>1</value>
+  </property>
+</configuration>


[6/8] impala git commit: IMPALA-7651: [DOCS] Kudu support to scheduler-related query hints and options

Posted by tm...@apache.org.
IMPALA-7651: [DOCS] Kudu support to scheduler-related query hints and options

The SCHEDULE_RANDOM_REPLICA query option and the RANDOM_REPLICA hint
support Kudu as well as HDFS.

Change-Id: I481d2a002edc1a18491bf9fc249e868005b42fa5
Reviewed-on: http://gerrit.cloudera.org:8080/11584
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Thomas Marshall <th...@cmu.edu>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/23428dc1
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/23428dc1
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/23428dc1

Branch: refs/heads/master
Commit: 23428dc147fa7ff0b65c4942229e6147bda243d5
Parents: 1914a8b
Author: Alex Rodoni <ar...@cloudera.com>
Authored: Thu Oct 4 15:19:32 2018 -0700
Committer: Alex Rodoni <ar...@cloudera.com>
Committed: Fri Oct 5 19:36:21 2018 +0000

----------------------------------------------------------------------
 docs/topics/impala_hints.xml                   | 47 +++++++-------
 docs/topics/impala_schedule_random_replica.xml | 72 ++++++++-------------
 2 files changed, 51 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/23428dc1/docs/topics/impala_hints.xml
----------------------------------------------------------------------
diff --git a/docs/topics/impala_hints.xml b/docs/topics/impala_hints.xml
index 6f853c1..1f2f08f 100644
--- a/docs/topics/impala_hints.xml
+++ b/docs/topics/impala_hints.xml
@@ -359,31 +359,32 @@ UPSERT [{ /* +SHUFFLE */ | /* +NOSHUFFLE */ }]
     <p conref="../shared/impala_common.xml#common/kudu_hints"/>
 
     <p rev="IMPALA-2924">
-      <b>Hints for scheduling of HDFS blocks:</b>
+      <b>Hints for scheduling of scan ranges (HDFS data blocks or Kudu
+        tablets)</b>
     </p>
 
-    <p rev="IMPALA-2924">
-      The hints <codeph>/* +SCHEDULE_CACHE_LOCAL */</codeph>, <codeph>/* +SCHEDULE_DISK_LOCAL
-      */</codeph>, and <codeph>/* +SCHEDULE_REMOTE */</codeph> have the same effect as
-      specifying the <codeph>REPLICA_PREFERENCE</codeph> query option with the respective option
-      settings of <codeph>CACHE_LOCAL</codeph>, <codeph>DISK_LOCAL</codeph>, or
-      <codeph>REMOTE</codeph>. The hint <codeph>/* +RANDOM_REPLICA */</codeph> is the same as
-      enabling the <codeph>SCHEDULE_RANDOM_REPLICA</codeph> query option.
-    </p>
-
-    <p rev="IMPALA-2924">
-      You can use these hints in combination by separating them with commas, for example,
-      <codeph>/* +SCHEDULE_CACHE_LOCAL,RANDOM_REPLICA */</codeph>. See
-      <xref keyref="replica_preference"/> and <xref keyref="schedule_random_replica"/> for
-      information about how these settings influence the way Impala processes HDFS data blocks.
-    </p>
-
-    <p rev="IMPALA-2924">
-      Specifying the replica preference as a query hint always overrides the query option
-      setting. Specifying either the <codeph>SCHEDULE_RANDOM_REPLICA</codeph> query option or
-      the corresponding <codeph>RANDOM_REPLICA</codeph> query hint enables the random
-      tie-breaking behavior when processing data blocks during the query.
-    </p>
+    <p rev="IMPALA-2924"> The hints <codeph>/* +SCHEDULE_CACHE_LOCAL
+      */</codeph>, <codeph>/* +SCHEDULE_DISK_LOCAL */</codeph>, and <codeph>/*
+        +SCHEDULE_REMOTE */</codeph> have the same effect as specifying the
+        <codeph>REPLICA_PREFERENCE</codeph> query option with the respective
+      option settings of <codeph>CACHE_LOCAL</codeph>,
+        <codeph>DISK_LOCAL</codeph>, or <codeph>REMOTE</codeph>. </p>
+    <p rev="IMPALA-2924"> Specifying the replica preference as a query hint
+      always overrides the query option setting. </p>
+    <p rev="IMPALA-2924">The hint <codeph>/* +RANDOM_REPLICA */</codeph> is the
+      same as enabling the <codeph>SCHEDULE_RANDOM_REPLICA</codeph> query
+      option. </p>
+
+    <p rev="IMPALA-2924"> You can use these hints in combination by separating
+      them with commas, for example, <codeph>/*
+        +SCHEDULE_CACHE_LOCAL,RANDOM_REPLICA */</codeph>. See <xref
+        keyref="replica_preference"/> and <xref keyref="schedule_random_replica"
+      /> for information about how these settings influence the way Impala
+      processes HDFS data blocks or Kudu tablets. </p>
+    <p rev="IMPALA-2924">Specifying either the
+        <codeph>SCHEDULE_RANDOM_REPLICA</codeph> query option or the
+      corresponding <codeph>RANDOM_REPLICA</codeph> query hint enables the
+      random tie-breaking behavior when processing data blocks during the query. </p>
 
     <p>
       <b>Suggestions versus directives:</b>

http://git-wip-us.apache.org/repos/asf/impala/blob/23428dc1/docs/topics/impala_schedule_random_replica.xml
----------------------------------------------------------------------
diff --git a/docs/topics/impala_schedule_random_replica.xml b/docs/topics/impala_schedule_random_replica.xml
index ae5978f..f8c50fe 100644
--- a/docs/topics/impala_schedule_random_replica.xml
+++ b/docs/topics/impala_schedule_random_replica.xml
@@ -21,7 +21,13 @@ under the License.
 <concept id="schedule_random_replica" rev="2.5.0">
 
   <title>SCHEDULE_RANDOM_REPLICA Query Option (<keyword keyref="impala25"/> or higher only)</title>
-  <titlealts audience="PDF"><navtitle>SCHEDULE_RANDOM_REPLICA</navtitle></titlealts>
+
+  <titlealts audience="PDF">
+
+    <navtitle>SCHEDULE_RANDOM_REPLICA</navtitle>
+
+  </titlealts>
+
   <prolog>
     <metadata>
       <data name="Category" value="Impala"/>
@@ -34,14 +40,23 @@ under the License.
 
   <conbody>
 
-    <p rev="2.5.0">
-      <indexterm audience="hidden">SCHEDULE_RANDOM_REPLICA query option</indexterm>
+    <p>
+      The <codeph>SCHEDULE_RANDOM_REPLICA</codeph> query option fine-tunes the scheduling
+      algorithm for deciding which host processes each HDFS data block or Kudu tablet to reduce
+      the chance of CPU hotspots.
+    </p>
+
+    <p>
+      By default, Impala estimates how much work each host has done for the query, and selects
+      the host that has the lowest workload. This algorithm is intended to reduce CPU hotspots
+      arising when the same host is selected to process multiple data blocks / tablets. Use the
+      <codeph>SCHEDULE_RANDOM_REPLICA</codeph> query option if hotspots still arise for some
+      combinations of queries and data layout.
     </p>
 
     <p>
-      The <codeph>SCHEDULE_RANDOM_REPLICA</codeph> query option fine-tunes the algorithm for deciding which host
-      processes each HDFS data block. It only applies to tables and partitions that are not enabled
-      for the HDFS caching feature.
+      The <codeph>SCHEDULE_RANDOM_REPLICA</codeph> query option only applies to tables and
+      partitions that are not enabled for the HDFS caching.
     </p>
 
     <p conref="../shared/impala_common.xml#common/type_boolean"/>
@@ -50,49 +65,16 @@ under the License.
 
     <p conref="../shared/impala_common.xml#common/added_in_250"/>
 
-    <p conref="../shared/impala_common.xml#common/usage_notes_blurb"/>
-
-    <p>
-      In the presence of HDFS cached replicas, Impala randomizes
-      which host processes each cached data block.
-      To ensure that HDFS data blocks are cached on more
-      than one host, use the <codeph>WITH REPLICATION</codeph> clause along with
-      the <codeph>CACHED IN</codeph> clause in a
-      <codeph>CREATE TABLE</codeph> or <codeph>ALTER TABLE</codeph> statement.
-      Specify a replication value greater than or equal to the HDFS block replication factor.
-    </p>
-
-    <p>
-      The <codeph>SCHEDULE_RANDOM_REPLICA</codeph> query option applies to tables and partitions
-      that <i>do not</i> use HDFS caching.
-      By default, Impala estimates how much work each host has done for
-      the query, and selects the host that has the lowest workload.
-      This algorithm is intended to reduce CPU hotspots arising when the
-      same host is selected to process multiple data blocks, but hotspots
-      might still arise for some combinations of queries and data layout.
-      When the <codeph>SCHEDULE_RANDOM_REPLICA</codeph> option is enabled,
-      Impala further randomizes the scheduling algorithm for non-HDFS cached blocks,
-      which can further reduce the chance of CPU hotspots.
-    </p>
-
-    <p rev="IMPALA-2979">
-      This query option works in conjunction with the work scheduling improvements
-      in <keyword keyref="impala25_full"/> and higher. The scheduling improvements
-      distribute the processing for cached HDFS data blocks to minimize hotspots:
-      if a data block is cached on more than one host, Impala chooses which host
-      to process each block based on which host has read the fewest bytes during
-      the current query. Enable <codeph>SCHEDULE_RANDOM_REPLICA</codeph> setting if CPU hotspots
-      still persist because of cases where hosts are <q>tied</q> in terms of
-      the amount of work done; by default, Impala picks the first eligible host
-      in this case.
-    </p>
-
     <p conref="../shared/impala_common.xml#common/related_info"/>
+
     <p>
       <xref href="impala_perf_hdfs_caching.xml#hdfs_caching"/>,
-      <xref href="impala_scalability.xml#scalability_hotspots"/>
-      , <xref href="impala_replica_preference.xml#replica_preference"/>
+      <xref
+        href="impala_scalability.xml#scalability_hotspots"/> ,
+      <xref
+        href="impala_replica_preference.xml#replica_preference"/>
     </p>
 
   </conbody>
+
 </concept>


[7/8] impala git commit: IMPALA-7633: count_user_privilege isn't 0 at the end of test_owner

Posted by tm...@apache.org.
IMPALA-7633: count_user_privilege isn't 0 at the end of test_owner

This patch adds a retry loop to validate the count of user privileges
in a SHOW GRANT USER statement after a DDL operation. The core of the
problem is cache consistency. When a DDL operation is executing, like
drop database, HMS is updated with the correct metadata, and Sentry is
updated to remove privileges from HMS. However, if a Sentry Refresh
happens between when HMS is updated CatalogOpExecutor:1322, and when
the local catalog privileges are updated CatalogOpExecutor:1341, then
the remove privilege call will fail and a log entry with "User does
not exist: foo_user" will be written to the log. The result is that
the response back to impalad with catalog updates will not contain
the user and privilege updates. Ultimately, when the "SHOW GRANT USER"
statement is run, it uses the local Impalad catalog which still
contains the privlege because it has not yet been updated from
statestore. This is not a security problem because the privilege
exists for a maximum of 2s by default, for an object that does not
exist. This is the same result as if the database was dropped from
Hive, except in that case it can be up to 62s by default that the
privilege exists for no object.

Testing:
- After retry was added, ran tests until log entry appeared and
  validate test did not fail.

Change-Id: Ifbba0fbd0e24a24b3f2af82ad5209f3fb7fb387b
Reviewed-on: http://gerrit.cloudera.org:8080/11595
Reviewed-by: Fredy Wijaya <fw...@cloudera.com>
Reviewed-by: Vuk Ercegovac <ve...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/37bee3ae
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/37bee3ae
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/37bee3ae

Branch: refs/heads/master
Commit: 37bee3aeca036f3eb96f55425627f02af78a1279
Parents: 23428dc
Author: Adam Holley <gi...@holleyism.com>
Authored: Fri Oct 5 08:53:33 2018 -0500
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Fri Oct 5 20:43:05 2018 +0000

----------------------------------------------------------------------
 tests/authorization/test_owner_privileges.py | 41 +++++++++++++----------
 1 file changed, 23 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/37bee3ae/tests/authorization/test_owner_privileges.py
----------------------------------------------------------------------
diff --git a/tests/authorization/test_owner_privileges.py b/tests/authorization/test_owner_privileges.py
index ac229ad..a44e3e8 100644
--- a/tests/authorization/test_owner_privileges.py
+++ b/tests/authorization/test_owner_privileges.py
@@ -38,6 +38,8 @@ SENTRY_LONG_POLLING_FREQUENCY_S = 60
 SENTRY_POLLING_FREQUENCY_S = 1
 # The timeout, in seconds, when waiting for a refresh of Sentry privileges.
 SENTRY_REFRESH_TIMEOUT_S = SENTRY_POLLING_FREQUENCY_S * 2
+# The timeout needed because of statestore refresh.
+STATESTORE_TIMEOUT_S = 3
 
 SENTRY_CONFIG_DIR = getenv('IMPALA_HOME') + '/fe/src/test/resources/'
 SENTRY_BASE_LOG_DIR = getenv('IMPALA_CLUSTER_LOGS_DIR') + "/sentry"
@@ -90,6 +92,15 @@ class TestOwnerPrivileges(SentryCacheTestSuite):
         total += 1
     return total
 
+  def _validate_user_privilege_count(self, client, query, user, delay_s, count):
+    start_time = time()
+    while time() - start_time < STATESTORE_TIMEOUT_S:
+      result = self.user_query(client, query, user=user, delay_s=delay_s)
+      if self.count_user_privileges(result) == count:
+        return True
+      sleep(1)
+    return False
+
   def _test_cleanup(self):
     # Admin for manipulation and cleaning up.
     try:
@@ -178,9 +189,8 @@ class TestOwnerPrivileges(SentryCacheTestSuite):
     # Change the database owner and ensure oo_user1 does not have owner privileges.
     self.user_query(self.oo_user1_impalad_client, "alter %s %s set owner user oo_user2"
         % (test_obj.obj_type, test_obj.obj_name), user="oo_user1")
-    result = self.user_query(self.oo_user1_impalad_client, "show grant user oo_user1",
-        user="oo_user1", delay_s=sentry_refresh_timeout_s)
-    assert self.count_user_privileges(result) == 0
+    assert self._validate_user_privilege_count(self.oo_user1_impalad_client,
+        "show grant user oo_user1", "oo_user1", sentry_refresh_timeout_s, 0)
 
     # Ensure oo_user1 cannot drop database after owner change.
     self.user_query(self.oo_user1_impalad_client, "drop %s %s" % (test_obj.obj_type,
@@ -196,16 +206,14 @@ class TestOwnerPrivileges(SentryCacheTestSuite):
     # privileges on the underlying table.
     self.execute_query("alter %s %s set owner user oo_user1" % (test_obj.obj_type,
         test_obj.obj_name))
-    result = self.user_query(self.oo_user2_impalad_client,
-        "show grant user oo_user2", user="oo_user2", delay_s=sentry_refresh_timeout_s)
-    assert self.count_user_privileges(result) == 0
+    assert self._validate_user_privilege_count(self.oo_user2_impalad_client,
+        "show grant user oo_user2", "oo_user2", sentry_refresh_timeout_s, 0)
     self.user_query(self.oo_user1_impalad_client,
         "alter %s %s set owner role owner_priv_test_owner_role"
         % (test_obj.obj_type, test_obj.obj_name), user="oo_user1")
     # Ensure oo_user1 does not have user privileges.
-    result = self.user_query(self.oo_user1_impalad_client, "show grant user oo_user1",
-        user="oo_user1", delay_s=sentry_refresh_timeout_s)
-    assert self.count_user_privileges(result) == 0
+    assert self._validate_user_privilege_count(self.oo_user1_impalad_client,
+        "show grant user oo_user1", "oo_user1", sentry_refresh_timeout_s, 0)
 
     # Ensure role has owner privileges.
     self.validate_privileges(self.oo_user1_impalad_client,
@@ -215,9 +223,8 @@ class TestOwnerPrivileges(SentryCacheTestSuite):
     # Drop the object and ensure no role privileges.
     self.user_query(self.oo_user1_impalad_client, "drop %s %s " % (test_obj.obj_type,
         test_obj.obj_name), user="oo_user1")
-    result = self.user_query(self.oo_user1_impalad_client, "show grant role " +
-        "owner_priv_test_owner_role", user="oo_user1", delay_s=sentry_refresh_timeout_s)
-    assert self.count_user_privileges(result) == 0
+    assert self._validate_user_privilege_count(self.oo_user1_impalad_client,
+        "show grant user oo_user1", "oo_user1", sentry_refresh_timeout_s, 0)
 
     # Ensure user privileges are gone after drop.
     self.user_query(self.oo_user1_impalad_client, "create %s if not exists %s %s %s"
@@ -225,9 +232,8 @@ class TestOwnerPrivileges(SentryCacheTestSuite):
         test_obj.view_select), user="oo_user1")
     self.user_query(self.oo_user1_impalad_client, "drop %s %s " % (test_obj.obj_type,
         test_obj.obj_name), user="oo_user1")
-    result = self.user_query(self.oo_user1_impalad_client, "show grant user oo_user1",
-        user="oo_user1")
-    assert self.count_user_privileges(result) == 0
+    assert self._validate_user_privilege_count(self.oo_user1_impalad_client,
+        "show grant user oo_user1", "oo_user1", sentry_refresh_timeout_s, 0)
 
   @pytest.mark.execute_serially
   @SentryCacheTestSuite.with_args(
@@ -373,6 +379,5 @@ class TestOwnerPrivileges(SentryCacheTestSuite):
 
     self.user_query(self.oo_user1_impalad_client, "drop %s %s " % (test_obj.obj_type,
         test_obj.obj_name), user="oo_user1")
-    result = self.user_query(self.oo_user1_impalad_client, "show grant user oo_user1",
-        user="oo_user1")
-    assert self.count_user_privileges(result) == 0
+    assert self._validate_user_privilege_count(self.oo_user1_impalad_client,
+        "show grant user oo_user1", "oo_user1", sentry_refresh_timeout_s, 0)


[5/8] impala git commit: IMPALA-7667: sentry.db.explicit.grants.permitted does not accept empty value

Posted by tm...@apache.org.
IMPALA-7667: sentry.db.explicit.grants.permitted does not accept empty value

Empty value in sentry.db.explicit.grants.permitted means allow all
privileges to be explicitly granted. However a bug in SENTRY-2424
causes empty value to be ignored. The patch uses a workaround to add
a space to mean the same thing as an empty value. The patch also
upgrades CDH_BUILD_NUMBER to 618371 to enable
sentry.db.explicit.grants.permitted configuration.

Testing:
- Ran all core tests

Change-Id: Iddcb80da6ba37405ce59e2d5cd79916f8eb004ec
Reviewed-on: http://gerrit.cloudera.org:8080/11592
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/1914a8b5
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/1914a8b5
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/1914a8b5

Branch: refs/heads/master
Commit: 1914a8b5557e835cdf67161af1d12f883dbf25d1
Parents: 3a37c72
Author: Fredy Wijaya <fw...@cloudera.com>
Authored: Thu Oct 4 20:53:53 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Fri Oct 5 19:07:23 2018 +0000

----------------------------------------------------------------------
 bin/impala-config.sh                                      | 2 +-
 fe/src/test/resources/sentry-site.xml.template            | 6 +++---
 fe/src/test/resources/sentry-site_no_oo.xml.template      | 4 ++--
 fe/src/test/resources/sentry-site_oo.xml.template         | 4 ++--
 fe/src/test/resources/sentry-site_oo_nogrant.xml.template | 4 ++--
 5 files changed, 10 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/1914a8b5/bin/impala-config.sh
----------------------------------------------------------------------
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index b9bfb7d..02e2eeb 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -158,7 +158,7 @@ fi
 : ${CDH_DOWNLOAD_HOST:=native-toolchain.s3.amazonaws.com}
 export CDH_DOWNLOAD_HOST
 export CDH_MAJOR_VERSION=6
-export CDH_BUILD_NUMBER=559250
+export CDH_BUILD_NUMBER=618371
 export IMPALA_HADOOP_VERSION=3.0.0-cdh6.x-SNAPSHOT
 export IMPALA_HBASE_VERSION=2.1.0-cdh6.x-SNAPSHOT
 export IMPALA_HIVE_VERSION=2.1.1-cdh6.x-SNAPSHOT

http://git-wip-us.apache.org/repos/asf/impala/blob/1914a8b5/fe/src/test/resources/sentry-site.xml.template
----------------------------------------------------------------------
diff --git a/fe/src/test/resources/sentry-site.xml.template b/fe/src/test/resources/sentry-site.xml.template
index 60b0da9..597a130 100644
--- a/fe/src/test/resources/sentry-site.xml.template
+++ b/fe/src/test/resources/sentry-site.xml.template
@@ -73,9 +73,9 @@
   <name>sentry.store.jdbc.driver</name>
  <value>org.postgresql.Driver</value>
 </property>
-<!-- Empty means allow all privileges -->
+<!-- Use a space to mean allow all privileges. See SENTRY-2424 -->
 <property>
-<name>sentry.db.explicit.grants.permitted</name>
-<value></value>
+  <name>sentry.db.explicit.grants.permitted</name>
+  <value> </value>
 </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/impala/blob/1914a8b5/fe/src/test/resources/sentry-site_no_oo.xml.template
----------------------------------------------------------------------
diff --git a/fe/src/test/resources/sentry-site_no_oo.xml.template b/fe/src/test/resources/sentry-site_no_oo.xml.template
index 1dbce81..cfebc9a 100644
--- a/fe/src/test/resources/sentry-site_no_oo.xml.template
+++ b/fe/src/test/resources/sentry-site_no_oo.xml.template
@@ -91,10 +91,10 @@
     <name>sentry.hive.testing.mode</name>
     <value>true</value>
   </property>
-  <!-- Empty means allow all privileges -->
+  <!-- Use a space to mean allow all privileges. See SENTRY-2424 -->
   <property>
     <name>sentry.db.explicit.grants.permitted</name>
-    <value></value>
+    <value> </value>
   </property>
   <!-- Custom group mapping for custom cluster tests -->
   <property>

http://git-wip-us.apache.org/repos/asf/impala/blob/1914a8b5/fe/src/test/resources/sentry-site_oo.xml.template
----------------------------------------------------------------------
diff --git a/fe/src/test/resources/sentry-site_oo.xml.template b/fe/src/test/resources/sentry-site_oo.xml.template
index d125933..3cd1b80 100644
--- a/fe/src/test/resources/sentry-site_oo.xml.template
+++ b/fe/src/test/resources/sentry-site_oo.xml.template
@@ -91,10 +91,10 @@
     <name>sentry.hive.testing.mode</name>
     <value>true</value>
   </property>
-  <!-- Empty means allow all privileges -->
+  <!-- Use a space to mean allow all privileges. See SENTRY-2424 -->
   <property>
     <name>sentry.db.explicit.grants.permitted</name>
-    <value></value>
+    <value> </value>
   </property>
   <!-- Custom group mapping for custom cluster tests -->
   <property>

http://git-wip-us.apache.org/repos/asf/impala/blob/1914a8b5/fe/src/test/resources/sentry-site_oo_nogrant.xml.template
----------------------------------------------------------------------
diff --git a/fe/src/test/resources/sentry-site_oo_nogrant.xml.template b/fe/src/test/resources/sentry-site_oo_nogrant.xml.template
index d984b69..7489bf6 100644
--- a/fe/src/test/resources/sentry-site_oo_nogrant.xml.template
+++ b/fe/src/test/resources/sentry-site_oo_nogrant.xml.template
@@ -91,10 +91,10 @@
     <name>sentry.hive.testing.mode</name>
     <value>true</value>
   </property>
-  <!-- Empty means allow all privileges -->
+  <!-- Use a space to mean allow all privileges. See SENTRY-2424 -->
   <property>
     <name>sentry.db.explicit.grants.permitted</name>
-    <value></value>
+    <value> </value>
   </property>
   <!-- Custom group mapping for custom cluster tests -->
   <property>


[8/8] impala git commit: IMPALA-7660: Support ECDH ciphers for debug webserver

Posted by tm...@apache.org.
IMPALA-7660: Support ECDH ciphers for debug webserver

A recent change (IMPALA-7519) added support for ecdh ciphers for the
beeswax/hs2 server. This patch pulls in a recent change on squeasel to
extend that support to the debug webserver.

It also fixes a bug that prevented start-impala-cluster.py from
completing successfully when the webserver is launched with ssl, due
to it trying to verify the availablitiy of the webserver over http.

Testing:
- Added a custom cluster test that verifies start-impala-cluster.py
  runs successfully with webserver ssl enabled.
- Adds the webserver to an existing test for ecdh ciphers.

Change-Id: I80a6b370d5860812cde13229b5bcb2977814c73c
Reviewed-on: http://gerrit.cloudera.org:8080/11585
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/0e1de31b
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/0e1de31b
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/0e1de31b

Branch: refs/heads/master
Commit: 0e1de31ba56bdac73b8db5c5ff316334c84725d9
Parents: 37bee3a
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
Authored: Mon Oct 1 22:04:05 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Fri Oct 5 21:16:17 2018 +0000

----------------------------------------------------------------------
 be/src/thirdparty/squeasel/squeasel.c   | 36 ++++++++++++++++++++++++----
 tests/common/impala_cluster.py          | 15 ++++++++----
 tests/common/impala_service.py          | 26 +++++++++++++-------
 tests/custom_cluster/test_client_ssl.py | 34 +++++++++++++++++++++-----
 4 files changed, 86 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/0e1de31b/be/src/thirdparty/squeasel/squeasel.c
----------------------------------------------------------------------
diff --git a/be/src/thirdparty/squeasel/squeasel.c b/be/src/thirdparty/squeasel/squeasel.c
index 2149497..045740d 100644
--- a/be/src/thirdparty/squeasel/squeasel.c
+++ b/be/src/thirdparty/squeasel/squeasel.c
@@ -4298,11 +4298,37 @@ static int set_ssl_option(struct sq_context *ctx) {
     (void) SSL_CTX_use_certificate_chain_file(ctx->ssl_ctx, pem);
   }
 
-  if (ctx->config[SSL_CIPHERS] != NULL &&
-      (SSL_CTX_set_cipher_list(ctx->ssl_ctx, ctx->config[SSL_CIPHERS]) == 0)) {
-    cry(fc(ctx), "SSL_CTX_set_cipher_list: error setting ciphers (%s): %s",
-        ctx->config[SSL_CIPHERS], ssl_error());
-    return 0;
+  if (ctx->config[SSL_CIPHERS] != NULL) {
+    if (SSL_CTX_set_cipher_list(ctx->ssl_ctx, ctx->config[SSL_CIPHERS]) == 0) {
+      cry(fc(ctx), "SSL_CTX_set_cipher_list: error setting ciphers (%s): %s",
+          ctx->config[SSL_CIPHERS], ssl_error());
+      return 0;
+    }
+#ifndef OPENSSL_NO_ECDH
+#if OPENSSL_VERSION_NUMBER < 0x10002000L
+    // OpenSSL 1.0.1 and below only support setting a single ECDH curve at once.
+    // We choose prime256v1 because it's the first curve listed in the "modern
+    // compatibility" section of the Mozilla Server Side TLS recommendations,
+    // accessed Feb. 2017.
+    EC_KEY* ecdh = EC_KEY_new_by_curve_name(NID_X9_62_prime256v1);
+    if (ecdh == NULL) {
+      cry(fc(ctx), "EC_KEY_new_by_curve_name: %s", ssl_error());
+    }
+
+    int rc = SSL_CTX_set_tmp_ecdh(ctx->ssl_ctx, ecdh);
+    if (rc <= 0) {
+      cry(fc(ctx), "SSL_CTX_set_tmp_ecdh: %s", ssl_error());
+    }
+#elif OPENSSL_VERSION_NUMBER < 0x10100000L
+    // OpenSSL 1.0.2 provides the set_ecdh_auto API which internally figures out
+    // the best curve to use.
+    int rc = SSL_CTX_set_ecdh_auto(ctx->ssl_ctx, 1);
+    if (rc <= 0) {
+      cry(fc(ctx), "SSL_CTX_set_ecdh_auto: %s", ssl_error());
+    }
+#endif
+#endif
+
   }
 
   return 1;

http://git-wip-us.apache.org/repos/asf/impala/blob/0e1de31b/tests/common/impala_cluster.py
----------------------------------------------------------------------
diff --git a/tests/common/impala_cluster.py b/tests/common/impala_cluster.py
index f25c8ed..05ff172 100644
--- a/tests/common/impala_cluster.py
+++ b/tests/common/impala_cluster.py
@@ -222,6 +222,10 @@ class BaseImpalaProcess(Process):
   def _get_webserver_port(self, default=None):
     return int(self._get_arg_value('webserver_port', default))
 
+  def _get_webserver_ssl(self):
+    """Returns true if the webserver is being run with ssl."""
+    return self._get_arg_value("webserver_certificate_file", "") != ""
+
   def _get_arg_value(self, arg_name, default=None):
     """Gets the argument value for given argument name"""
     for arg in self.cmd:
@@ -239,7 +243,8 @@ class ImpaladProcess(BaseImpalaProcess):
     self.service = ImpaladService(self.hostname, self._get_webserver_port(default=25000),
                                   self.__get_beeswax_port(default=21000),
                                   self.__get_be_port(default=22000),
-                                  self.__get_hs2_port(default=21050))
+                                  self.__get_hs2_port(default=21050),
+                                  self._get_webserver_ssl())
 
   def __get_beeswax_port(self, default=None):
     return int(self._get_arg_value('beeswax_port', default))
@@ -264,16 +269,16 @@ class ImpaladProcess(BaseImpalaProcess):
 class StateStoreProcess(BaseImpalaProcess):
   def __init__(self, cmd):
     super(StateStoreProcess, self).__init__(cmd, socket.gethostname())
-    self.service =\
-        StateStoredService(self.hostname, self._get_webserver_port(default=25010))
+    self.service = StateStoredService(
+        self.hostname, self._get_webserver_port(default=25010), self._get_webserver_ssl())
 
 
 # Represents a catalogd process
 class CatalogdProcess(BaseImpalaProcess):
   def __init__(self, cmd):
     super(CatalogdProcess, self).__init__(cmd, socket.gethostname())
-    self.service = CatalogdService(self.hostname,
-        self._get_webserver_port(default=25020), self.__get_port(default=26000))
+    self.service = CatalogdService(self.hostname, self._get_webserver_port(default=25020),
+        self._get_webserver_ssl(), self.__get_port(default=26000))
 
   def __get_port(self, default=None):
     return int(self._get_arg_value('catalog_service_port', default))

http://git-wip-us.apache.org/repos/asf/impala/blob/0e1de31b/tests/common/impala_service.py
----------------------------------------------------------------------
diff --git a/tests/common/impala_service.py b/tests/common/impala_service.py
index 0ad4496..a99e0ce 100644
--- a/tests/common/impala_service.py
+++ b/tests/common/impala_service.py
@@ -22,6 +22,7 @@
 import json
 import logging
 import re
+import ssl
 import urllib
 from time import sleep, time
 
@@ -41,17 +42,24 @@ LOG.setLevel(level=logging.DEBUG)
 # Base class for all Impala services
 # TODO: Refactor the retry/timeout logic into a common place.
 class BaseImpalaService(object):
-  def __init__(self, hostname, webserver_port):
+  def __init__(self, hostname, webserver_port, webserver_ssl=False):
     self.hostname = hostname
     self.webserver_port = webserver_port
+    self.webserver_ssl = webserver_ssl
 
   def open_debug_webpage(self, page_name, timeout=10, interval=1):
     start_time = time()
 
     while (time() - start_time < timeout):
       try:
-        return urllib.urlopen("http://%s:%d/%s" %
-            (self.hostname, int(self.webserver_port), page_name))
+        protocol = "http"
+        context = None
+        if self.webserver_ssl:
+          protocol = "https"
+          context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+        url = "%s://%s:%d/%s" % \
+            (protocol, self.hostname, int(self.webserver_port), page_name)
+        return urllib.urlopen(url, context=context)
       except Exception:
         LOG.info("Debug webpage not yet available.")
       sleep(interval)
@@ -166,8 +174,8 @@ class BaseImpalaService(object):
 # new connections or accessing the debug webpage.
 class ImpaladService(BaseImpalaService):
   def __init__(self, hostname, webserver_port=25000, beeswax_port=21000, be_port=22000,
-               hs2_port=21050):
-    super(ImpaladService, self).__init__(hostname, webserver_port)
+               hs2_port=21050, webserver_ssl=False):
+    super(ImpaladService, self).__init__(hostname, webserver_port, webserver_ssl)
     self.beeswax_port = beeswax_port
     self.be_port = be_port
     self.hs2_port = hs2_port
@@ -307,8 +315,8 @@ class ImpaladService(BaseImpalaService):
 # Allows for interacting with the StateStore service to perform operations such as
 # accessing the debug webpage.
 class StateStoredService(BaseImpalaService):
-  def __init__(self, hostname, webserver_port):
-    super(StateStoredService, self).__init__(hostname, webserver_port)
+  def __init__(self, hostname, webserver_port, webserver_ssl):
+    super(StateStoredService, self).__init__(hostname, webserver_port, webserver_ssl)
 
   def wait_for_live_subscribers(self, num_subscribers, timeout=15, interval=1):
     self.wait_for_metric_value('statestore.live-backends', num_subscribers,
@@ -318,8 +326,8 @@ class StateStoredService(BaseImpalaService):
 # Allows for interacting with the Catalog service to perform operations such as
 # accessing the debug webpage.
 class CatalogdService(BaseImpalaService):
-  def __init__(self, hostname, webserver_port, service_port):
-    super(CatalogdService, self).__init__(hostname, webserver_port)
+  def __init__(self, hostname, webserver_port, webserver_ssl, service_port):
+    super(CatalogdService, self).__init__(hostname, webserver_port, webserver_ssl)
     self.service_port = service_port
 
   def get_catalog_version(self, timeout=10, interval=1):

http://git-wip-us.apache.org/repos/asf/impala/blob/0e1de31b/tests/custom_cluster/test_client_ssl.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_client_ssl.py b/tests/custom_cluster/test_client_ssl.py
index b6f7e04..7bbc2df 100644
--- a/tests/custom_cluster/test_client_ssl.py
+++ b/tests/custom_cluster/test_client_ssl.py
@@ -19,6 +19,7 @@
 import logging
 import os
 import pytest
+import requests
 import signal
 import ssl
 import socket
@@ -110,13 +111,27 @@ class TestClientSsl(CustomClusterTestSuite):
     assert "Query Status: Cancelled" in result.stdout
     assert impalad.wait_for_num_in_flight_queries(0)
 
+  WEBSERVER_SSL_ARGS = ("--webserver_certificate_file=%(cert_dir)s/server-cert.pem "
+                        "--webserver_private_key_file=%(cert_dir)s/server-key.pem "
+                        % {'cert_dir': CERT_DIR})
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(impalad_args=WEBSERVER_SSL_ARGS,
+                                    statestored_args=WEBSERVER_SSL_ARGS,
+                                    catalogd_args=WEBSERVER_SSL_ARGS)
+  def test_webserver_ssl(self):
+    "Tests that the debug web pages are reachable when run with ssl."
+    self._verify_ssl_webserver()
+
   # Test that the shell can connect to a ECDH only cluster.
-  TLS_ECDH_ARGS = ("--ssl_client_ca_certificate=%s/server-cert.pem "
-                  "--ssl_server_certificate=%s/server-cert.pem "
-                  "--ssl_private_key=%s/server-key.pem "
-                  "--hostname=localhost "  # Required to match hostname in certificate"
-                  "--ssl_cipher_list=ECDHE-RSA-AES128-GCM-SHA256 "
-                  % (CERT_DIR, CERT_DIR, CERT_DIR))
+  TLS_ECDH_ARGS = ("--ssl_client_ca_certificate=%(cert_dir)s/server-cert.pem "
+                   "--ssl_server_certificate=%(cert_dir)s/server-cert.pem "
+                   "--ssl_private_key=%(cert_dir)s/server-key.pem "
+                   "--hostname=localhost "  # Required to match hostname in certificate"
+                   "--ssl_cipher_list=ECDHE-RSA-AES128-GCM-SHA256 "
+                   "--webserver_certificate_file=%(cert_dir)s/server-cert.pem "
+                   "--webserver_private_key_file=%(cert_dir)s/server-key.pem "
+                   % {'cert_dir': CERT_DIR})
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(impalad_args=TLS_ECDH_ARGS,
@@ -128,6 +143,7 @@ class TestClientSsl(CustomClusterTestSuite):
   def test_tls_ecdh(self, vector):
     self._verify_negative_cases()
     self._validate_positive_cases("%s/server-cert.pem" % self.CERT_DIR)
+    self._verify_ssl_webserver()
 
   # Test that the shell can connect to a TLS1.2 only cluster, and for good measure
   # restrict the cipher suite to just one choice.
@@ -209,3 +225,9 @@ class TestClientSsl(CustomClusterTestSuite):
       result = run_impala_shell_cmd(shell_options)
       for msg in [self.SSL_ENABLED, self.CONNECTED, self.FETCHED]:
         assert msg in result.stderr
+
+  def _verify_ssl_webserver(self):
+    for port in ["25000", "25010", "25020"]:
+      url = "https://localhost:%s" % port
+      response = requests.get(url, verify="%s/server-cert.pem" % self.CERT_DIR)
+      assert response.status_code == requests.codes.ok, url