You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2019/08/01 20:16:15 UTC

[impala] branch master updated (615a821 -> 48bb93d)

This is an automated email from the ASF dual-hosted git repository.

stakiar pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from 615a821  IMPALA-8820: fix start-impala-cluster catalogd startup
     new 699450a  IMPALA-8779, IMPALA-8780: RowBatchQueue re-factoring and BufferedPRS impl
     new a0b0fd4  IMPALA-7486: Add specialized estimation scheme for dedicated coordinators
     new 2d81965  IMPALA-8600: Refresh transactional tables
     new e8bd307  IMPALA-8812: [DOCS] Negative index support in SPLIT_PART function
     new 48bb93d  IMPALA-8636: fix flakiness of ACID INSERT tests

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/exec/blocking-plan-root-sink.cc             |  15 +-
 be/src/exec/blocking-plan-root-sink.h              |   3 -
 be/src/exec/buffered-plan-root-sink.cc             | 126 +++++++++++-
 be/src/exec/buffered-plan-root-sink.h              |  53 ++++-
 be/src/exec/data-sink.cc                           |   1 +
 be/src/exec/hdfs-scan-node.cc                      |   4 +-
 be/src/exec/kudu-scan-node.cc                      |   4 +-
 be/src/exec/plan-root-sink.cc                      |  15 ++
 be/src/exec/plan-root-sink.h                       |  10 +
 be/src/exec/scan-node.cc                           |  10 +-
 be/src/exec/scan-node.h                            |   6 +-
 be/src/exec/scanner-context.cc                     |   1 -
 be/src/runtime/CMakeLists.txt                      |   3 +-
 ...-batch-queue.cc => blocking-row-batch-queue.cc} |  39 +++-
 be/src/runtime/blocking-row-batch-queue.h          | 114 +++++++++++
 be/src/runtime/coordinator.cc                      |   8 +-
 be/src/runtime/deque-row-batch-queue.cc            |  66 ++++++
 be/src/runtime/deque-row-batch-queue.h             |  70 +++++++
 be/src/runtime/row-batch-queue.h                   |  80 --------
 be/src/scheduling/admission-controller-test.cc     | 226 ++++++++++++++++++++-
 be/src/scheduling/admission-controller.cc          | 146 ++++++++-----
 be/src/scheduling/admission-controller.h           |  22 +-
 be/src/scheduling/query-schedule.cc                |  84 ++++++--
 be/src/scheduling/query-schedule.h                 |  70 ++++++-
 be/src/scheduling/scheduler.cc                     |  11 +-
 be/src/service/impala-http-handler.cc              |  17 +-
 be/src/util/backend-gflag-util.cc                  |   8 +-
 be/src/util/blocking-queue.h                       |  47 ++---
 common/thrift/BackendGflags.thrift                 |   4 +
 common/thrift/Frontend.thrift                      |   5 +
 docs/topics/impala_string_functions.xml            | 115 ++++-------
 .../apache/impala/analysis/ResetMetadataStmt.java  |  14 ++
 .../java/org/apache/impala/catalog/HdfsTable.java  |   7 +
 .../impala/catalog/events/MetastoreEvents.java     | 185 ++++++++++-------
 .../org/apache/impala/planner/PlanFragment.java    |   4 +
 .../java/org/apache/impala/planner/Planner.java    |  37 +++-
 .../org/apache/impala/service/BackendConfig.java   |   4 +
 .../apache/impala/service/CatalogOpExecutor.java   |  52 ++++-
 .../java/org/apache/impala/service/Frontend.java   |   9 +-
 .../java/org/apache/impala/util/AcidUtils.java     |   2 +-
 .../org/apache/impala/analysis/AnalyzerTest.java   |   9 +
 .../functional-query/queries/QueryTest/acid.test   |   8 +
 .../QueryTest/dedicated-coord-mem-estimates.test   |  96 +++++++++
 tests/custom_cluster/test_admission_controller.py  | 168 ++++++++++++++-
 tests/custom_cluster/test_event_processing.py      |  28 ++-
 tests/query_test/test_insert.py                    |   2 +
 tests/query_test/test_result_spooling.py           |   7 +-
 www/admission_controller.tmpl                      |  16 +-
 48 files changed, 1595 insertions(+), 436 deletions(-)
 rename be/src/runtime/{row-batch-queue.cc => blocking-row-batch-queue.cc} (51%)
 create mode 100644 be/src/runtime/blocking-row-batch-queue.h
 create mode 100644 be/src/runtime/deque-row-batch-queue.cc
 create mode 100644 be/src/runtime/deque-row-batch-queue.h
 delete mode 100644 be/src/runtime/row-batch-queue.h
 create mode 100644 testdata/workloads/functional-query/queries/QueryTest/dedicated-coord-mem-estimates.test


[impala] 02/05: IMPALA-7486: Add specialized estimation scheme for dedicated coordinators

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stakiar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit a0b0fd4fce7676255dda52445cb972424f0b7c26
Author: Bikramjeet Vig <bi...@cloudera.com>
AuthorDate: Wed Jun 19 13:53:18 2019 -0700

    IMPALA-7486: Add specialized estimation scheme for dedicated coordinators
    
    This patch computes two memory estimates in the frontend:
    an estimate for any host that is an executor (including
    a combined coordinator and executor) and an estimate
    for a dedicated coordinator. This is computed regardless
    of whether it is a dedicated coordinator or not.
    
    Admission control then, in the case when the coordinator
    is dedicated, uses the coordinator memory estimate for
    the coordinator node and the executor memory estimate
    for all other nodes.
    
    Other highlights:
     - if MEM_LIMIT query option is set, it is applied to all backends,
       both executors and coordinators.
     - the min_query_mem_limit pool config is not enforced on the
       dedicated coordinator estimates unless MEM_LIMIT query option is set.
     - the lower cap on estimates and the admission checks based on the
       min mem limit required for reservation are applied separately on
       coordinator's and executors' mem requirements.
     - Added a hidden startup option 'use_dedicated_coordinator_estimates'
       which if set to false, reverts to previous estimation behavior.
    
    Testing:
    - Added unit test for admission/rejection in dedicated
      coordinator clusters.
    - Added end to end tests.
    
    Change-Id: I2b94e7293b91dec8a18491079c34923eadd94b21
    Reviewed-on: http://gerrit.cloudera.org:8080/13740
    Reviewed-by: Bikramjeet Vig <bi...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/coordinator.cc                      |   8 +-
 be/src/scheduling/admission-controller-test.cc     | 226 ++++++++++++++++++++-
 be/src/scheduling/admission-controller.cc          | 146 ++++++++-----
 be/src/scheduling/admission-controller.h           |  22 +-
 be/src/scheduling/query-schedule.cc                |  84 ++++++--
 be/src/scheduling/query-schedule.h                 |  70 ++++++-
 be/src/scheduling/scheduler.cc                     |  11 +-
 be/src/service/impala-http-handler.cc              |  17 +-
 be/src/util/backend-gflag-util.cc                  |   8 +-
 common/thrift/BackendGflags.thrift                 |   4 +
 common/thrift/Frontend.thrift                      |   5 +
 .../org/apache/impala/planner/PlanFragment.java    |   4 +
 .../java/org/apache/impala/planner/Planner.java    |  37 +++-
 .../org/apache/impala/service/BackendConfig.java   |   4 +
 .../java/org/apache/impala/service/Frontend.java   |   9 +-
 .../QueryTest/dedicated-coord-mem-estimates.test   |  96 +++++++++
 tests/custom_cluster/test_admission_controller.py  | 168 ++++++++++++++-
 www/admission_controller.tmpl                      |  16 +-
 18 files changed, 820 insertions(+), 115 deletions(-)

diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index d4ce820..bd4fb75 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -111,8 +111,14 @@ Status Coordinator::Exec() {
   const string& str = Substitute("Query $0", PrintId(query_id()));
   progress_.Init(str, schedule_.num_scan_ranges());
 
+  // If there is no coord fragment then pick the mem limit computed for an executor.
+  // TODO: IMPALA-8791: make sure minimal or no limit is imposed for cases where no
+  // fragments are scheduled to run on the coordinator backend.
+  int64_t coord_mem_limit = schedule_.requiresCoordinatorFragment() ?
+      schedule_.coord_backend_mem_limit() :
+      schedule_.per_backend_mem_limit();
   query_state_ = ExecEnv::GetInstance()->query_exec_mgr()->CreateQueryState(
-      query_ctx(), schedule_.per_backend_mem_limit());
+      query_ctx(), coord_mem_limit);
   filter_mem_tracker_ = query_state_->obj_pool()->Add(new MemTracker(
       -1, "Runtime Filter (Coordinator)", query_state_->query_mem_tracker(), false));
 
diff --git a/be/src/scheduling/admission-controller-test.cc b/be/src/scheduling/admission-controller-test.cc
index 8e8498a..a241525 100644
--- a/be/src/scheduling/admission-controller-test.cc
+++ b/be/src/scheduling/admission-controller-test.cc
@@ -20,6 +20,7 @@
 #include "common/names.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/logging_test_util.h"
+#include "runtime/bufferpool/reservation-util.h"
 #include "runtime/exec-env.h"
 #include "runtime/runtime-state.h"
 #include "runtime/test-env.h"
@@ -50,6 +51,7 @@ static const string HOST_1 = "host1:25000";
 static const string HOST_2 = "host2:25000";
 
 static const int64_t MEGABYTE = 1024L * 1024L;
+static const int64_t GIGABYTE = 1024L * MEGABYTE;
 
 /// Parent class for Admission Controller tests.
 /// Common code and constants should go here.
@@ -80,18 +82,22 @@ class AdmissionControllerTest : public testing::Test {
 
   /// Make a QuerySchedule with dummy parameters that can be used to test admission and
   /// rejection in AdmissionController.
-  QuerySchedule* MakeQuerySchedule(string request_pool_name, TPoolConfig& config,
-      const int num_hosts, const int per_host_mem_estimate) {
+  QuerySchedule* MakeQuerySchedule(string request_pool_name, int64_t mem_limit,
+      TPoolConfig& config, const int num_hosts, const int per_host_mem_estimate,
+      const int coord_mem_estimate, bool is_dedicated_coord) {
     DCHECK_GT(num_hosts, 0);
     TQueryExecRequest* request = pool_.Add(new TQueryExecRequest());
     request->query_ctx.request_pool = request_pool_name;
     request->__set_per_host_mem_estimate(per_host_mem_estimate);
+    request->__set_dedicated_coord_mem_estimate(coord_mem_estimate);
+    request->__set_stmt_type(TStmtType::QUERY);
 
     RuntimeProfile* profile = RuntimeProfile::Create(&pool_, "pool1");
     TUniqueId* query_id = pool_.Add(new TUniqueId()); // always 0,0
     TQueryOptions* query_options = pool_.Add(new TQueryOptions());
-    QuerySchedule* query_schedule =
-        pool_.Add(new QuerySchedule(*query_id, *request, *query_options, profile));
+    query_options->__set_mem_limit(mem_limit);
+    QuerySchedule* query_schedule = pool_.Add(new QuerySchedule(
+        *query_id, *request, *query_options, is_dedicated_coord, profile));
     query_schedule->set_executor_group(ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME);
     query_schedule->UpdateMemoryRequirements(config);
 
@@ -99,6 +105,13 @@ class AdmissionControllerTest : public testing::Test {
     return query_schedule;
   }
 
+  /// Same as previous MakeQuerySchedule with fewer input (more default params).
+  QuerySchedule* MakeQuerySchedule(string request_pool_name, TPoolConfig& config,
+      const int num_hosts, const int per_host_mem_estimate) {
+    return MakeQuerySchedule(request_pool_name, 0, config, num_hosts,
+        per_host_mem_estimate, per_host_mem_estimate, false);
+  }
+
   /// Replace the per-backend hosts in the schedule with one having 'count' hosts.
   void SetHostsInQuerySchedule(QuerySchedule& query_schedule, const int count,
       int64_t min_mem_reservation_bytes = 0, int64_t admit_mem_limit = 200L * MEGABYTE) {
@@ -107,6 +120,7 @@ class AdmissionControllerTest : public testing::Test {
       BackendExecParams* backend_exec_params = pool_.Add(new BackendExecParams());
       backend_exec_params->admit_mem_limit = admit_mem_limit;
       backend_exec_params->min_mem_reservation_bytes = min_mem_reservation_bytes;
+      if (i == 0) backend_exec_params->contains_coord_fragment = true;
       const string host_name = Substitute("host$0", i);
       per_backend_exec_params->emplace(
           MakeNetworkAddress(host_name, 25000), *backend_exec_params);
@@ -684,4 +698,208 @@ TEST_F(AdmissionControllerTest, PoolDisabled) {
       /* max_mem_resources */ 0, /* max_memory_multiple */ 0);
 }
 
+// Basic tests of the QuerySchedule object to confirm that a query with different
+// coordinator and executor memory estimates calculates memory to admit correctly
+// for various combinations of memory limit configurations.
+TEST_F(AdmissionControllerTest, DedicatedCoordQuerySchedule) {
+  AdmissionController* admission_controller = MakeAdmissionController();
+  RequestPoolService* request_pool_service = admission_controller->request_pool_service_;
+
+  const int64_t PER_EXEC_MEM_ESTIMATE = 512 * MEGABYTE;
+  const int64_t COORD_MEM_ESTIMATE = 150 * MEGABYTE;
+  TPoolConfig pool_config;
+  ASSERT_OK(request_pool_service->GetPoolConfig("default", &pool_config));
+
+  // For query only running on the coordinator, the per_backend_mem_to_admit should be 0.
+  QuerySchedule* query_schedule = MakeQuerySchedule(
+      "default", 0, pool_config, 1, PER_EXEC_MEM_ESTIMATE, COORD_MEM_ESTIMATE, true);
+  query_schedule->UpdateMemoryRequirements(pool_config);
+  ASSERT_EQ(0, query_schedule->per_backend_mem_to_admit());
+  ASSERT_EQ(COORD_MEM_ESTIMATE, query_schedule->coord_backend_mem_to_admit());
+
+  // Make sure executors and coordinators are assigned memory to admit appropriately and
+  // that the cluster memory to admitted is calculated correctly.
+  query_schedule = MakeQuerySchedule(
+      "default", 0, pool_config, 2, PER_EXEC_MEM_ESTIMATE, COORD_MEM_ESTIMATE, true);
+  ASSERT_EQ(COORD_MEM_ESTIMATE, query_schedule->GetDedicatedCoordMemoryEstimate());
+  ASSERT_EQ(PER_EXEC_MEM_ESTIMATE, query_schedule->GetPerExecutorMemoryEstimate());
+  query_schedule->UpdateMemoryRequirements(pool_config);
+  ASSERT_EQ(PER_EXEC_MEM_ESTIMATE, query_schedule->per_backend_mem_to_admit());
+  ASSERT_EQ(COORD_MEM_ESTIMATE, query_schedule->coord_backend_mem_to_admit());
+  ASSERT_EQ(-1, query_schedule->per_backend_mem_limit());
+  ASSERT_EQ(-1, query_schedule->coord_backend_mem_limit());
+  ASSERT_EQ(COORD_MEM_ESTIMATE + PER_EXEC_MEM_ESTIMATE,
+      query_schedule->GetClusterMemoryToAdmit());
+
+  // Set the min_query_mem_limit in pool_config. min_query_mem_limit should
+  // not be enforced on the coordinator. Also ensure mem limits are set for both.
+  query_schedule = MakeQuerySchedule(
+      "default", 0, pool_config, 2, PER_EXEC_MEM_ESTIMATE, COORD_MEM_ESTIMATE, true);
+  ASSERT_OK(request_pool_service->GetPoolConfig("default", &pool_config));
+  pool_config.__set_min_query_mem_limit(700 * MEGABYTE);
+  query_schedule->UpdateMemoryRequirements(pool_config);
+  ASSERT_EQ(700 * MEGABYTE, query_schedule->per_backend_mem_to_admit());
+  ASSERT_EQ(COORD_MEM_ESTIMATE, query_schedule->coord_backend_mem_to_admit());
+  ASSERT_EQ(700 * MEGABYTE, query_schedule->per_backend_mem_limit());
+  ASSERT_EQ(COORD_MEM_ESTIMATE, query_schedule->coord_backend_mem_limit());
+
+  // Make sure coordinator's mem to admit is adjusted based on its own minimum mem
+  // reservation.
+  query_schedule = MakeQuerySchedule(
+      "default", 0, pool_config, 2, PER_EXEC_MEM_ESTIMATE, COORD_MEM_ESTIMATE, true);
+  int64_t coord_min_reservation = 200 * MEGABYTE;
+  int64_t min_coord_mem_limit_required =
+      ReservationUtil::GetMinMemLimitFromReservation(coord_min_reservation);
+  pool_config.__set_min_query_mem_limit(700 * MEGABYTE);
+  query_schedule->set_coord_min_reservation(200 * MEGABYTE);
+  query_schedule->UpdateMemoryRequirements(pool_config);
+  ASSERT_EQ(700 * MEGABYTE, query_schedule->per_backend_mem_to_admit());
+  ASSERT_EQ(min_coord_mem_limit_required, query_schedule->coord_backend_mem_to_admit());
+  ASSERT_EQ(700 * MEGABYTE, query_schedule->per_backend_mem_limit());
+  ASSERT_EQ(min_coord_mem_limit_required, query_schedule->coord_backend_mem_limit());
+
+  // Set mem_limit query option.
+  ASSERT_OK(request_pool_service->GetPoolConfig("default", &pool_config));
+  query_schedule = MakeQuerySchedule("default", GIGABYTE, pool_config, 2,
+      PER_EXEC_MEM_ESTIMATE, COORD_MEM_ESTIMATE, true);
+  query_schedule->UpdateMemoryRequirements(pool_config);
+  ASSERT_EQ(GIGABYTE, query_schedule->per_backend_mem_to_admit());
+  ASSERT_EQ(GIGABYTE, query_schedule->coord_backend_mem_to_admit());
+  ASSERT_EQ(GIGABYTE, query_schedule->per_backend_mem_limit());
+  ASSERT_EQ(GIGABYTE, query_schedule->coord_backend_mem_limit());
+
+  // Set mem_limit query option and max_query_mem_limit. In this case, max_query_mem_limit
+  // will be enforced on both coordinator and executor.
+  query_schedule = MakeQuerySchedule("default", GIGABYTE, pool_config, 2,
+      PER_EXEC_MEM_ESTIMATE, COORD_MEM_ESTIMATE, true);
+  ASSERT_OK(request_pool_service->GetPoolConfig("default", &pool_config));
+  pool_config.__set_max_query_mem_limit(700 * MEGABYTE);
+  query_schedule->UpdateMemoryRequirements(pool_config);
+  ASSERT_EQ(700 * MEGABYTE, query_schedule->per_backend_mem_to_admit());
+  ASSERT_EQ(700 * MEGABYTE, query_schedule->coord_backend_mem_to_admit());
+  ASSERT_EQ(700 * MEGABYTE, query_schedule->per_backend_mem_limit());
+  ASSERT_EQ(700 * MEGABYTE, query_schedule->coord_backend_mem_limit());
+}
+
+// Test admission decisions for clusters with dedicated coordinators, where different
+// amounts of memory should be admitted on coordinators and executors.
+TEST_F(AdmissionControllerTest, DedicatedCoordAdmissionChecks) {
+  AdmissionController* admission_controller = MakeAdmissionController();
+  RequestPoolService* request_pool_service = admission_controller->request_pool_service_;
+
+  TPoolConfig pool_config;
+  ASSERT_OK(request_pool_service->GetPoolConfig("default", &pool_config));
+  pool_config.__set_max_mem_resources(2*GIGABYTE); // to enable memory based admission.
+
+  // Set up a query schedule to test.
+  const int64_t PER_EXEC_MEM_ESTIMATE = GIGABYTE;
+  const int64_t COORD_MEM_ESTIMATE = 150 * MEGABYTE;
+  QuerySchedule* query_schedule = MakeQuerySchedule(
+      "default", 0, pool_config, 2, PER_EXEC_MEM_ESTIMATE, COORD_MEM_ESTIMATE, true);
+  PerBackendExecParams* per_backend_exec_params = pool_.Add(new PerBackendExecParams());
+  // Add coordinator backend.
+  BackendExecParams* coord_exec_params = pool_.Add(new BackendExecParams());
+  coord_exec_params->admit_mem_limit = 512 * MEGABYTE;
+  coord_exec_params->contains_coord_fragment = true;
+  coord_exec_params->thread_reservation = 1;
+  const string coord_host_name = Substitute("host$0", 1);
+  TNetworkAddress coord_addr = MakeNetworkAddress(coord_host_name, 25000);
+  const string coord_host = TNetworkAddressToString(coord_addr);
+  per_backend_exec_params->emplace(coord_addr, *coord_exec_params);
+  // Add executor backend.
+  BackendExecParams* backend_exec_params = pool_.Add(new BackendExecParams());
+  backend_exec_params->admit_mem_limit = GIGABYTE;
+  backend_exec_params->thread_reservation = 1;
+  const string exec_host_name = Substitute("host$0", 2);
+  TNetworkAddress exec_addr = MakeNetworkAddress(exec_host_name, 25000);
+  const string exec_host = TNetworkAddressToString(exec_addr);
+  per_backend_exec_params->emplace(exec_addr, *backend_exec_params);
+  query_schedule->set_per_backend_exec_params(*per_backend_exec_params);
+  string not_admitted_reason;
+  // Test 1: coord's admit_mem_limit < executor's admit_mem_limit. Query should not
+  // be rejected because query fits on both executor and coordinator. It should be
+  // queued if there is not enough capacity.
+  query_schedule->UpdateMemoryRequirements(pool_config);
+  ASSERT_FALSE(admission_controller->RejectForSchedule(
+      *query_schedule, pool_config, 2, 2, &not_admitted_reason));
+  ASSERT_TRUE(admission_controller->HasAvailableMemResources(
+      *query_schedule, pool_config, 2, &not_admitted_reason));
+  // Coord does not have enough available memory.
+  admission_controller->host_stats_[coord_host].mem_reserved = 500 * MEGABYTE;
+  ASSERT_FALSE(admission_controller->HasAvailableMemResources(
+      *query_schedule, pool_config, 2, &not_admitted_reason));
+  EXPECT_STR_CONTAINS(not_admitted_reason,
+      "Not enough memory available on host host1:25000. Needed 150.00 MB but only "
+      "12.00 MB out of 512.00 MB was available.");
+  not_admitted_reason.clear();
+  // Neither coordinator or executor has enough available memory.
+  admission_controller->host_stats_[exec_host].mem_reserved = 500 * MEGABYTE;
+  ASSERT_FALSE(admission_controller->HasAvailableMemResources(
+      *query_schedule, pool_config, 2, &not_admitted_reason));
+  EXPECT_STR_CONTAINS(not_admitted_reason,
+      "Not enough memory available on host host2:25000. Needed 1.00 GB but only "
+      "524.00 MB out of 1.00 GB was available.");
+  not_admitted_reason.clear();
+  // Executor does not have enough available memory.
+  admission_controller->host_stats_[coord_host].mem_reserved = 0;
+  ASSERT_FALSE(admission_controller->HasAvailableMemResources(
+      *query_schedule, pool_config, 2, &not_admitted_reason));
+  EXPECT_STR_CONTAINS(not_admitted_reason,
+      "Not enough memory available on host host2:25000. Needed 1.00 GB but only "
+      "524.00 MB out of 1.00 GB was available.");
+  not_admitted_reason.clear();
+  admission_controller->host_stats_[exec_host].mem_reserved = 0;
+
+  // Test 2: coord's admit_mem_limit < executor's admit_mem_limit. Query rejected because
+  // coord's admit_mem_limit is less than mem_to_admit on the coord.
+  // Re-using previous QuerySchedule object.
+  coord_exec_params->admit_mem_limit = 100 * MEGABYTE;
+  (*per_backend_exec_params)[coord_addr] = *coord_exec_params;
+  query_schedule->set_per_backend_exec_params(*per_backend_exec_params);
+  ASSERT_TRUE(admission_controller->RejectForSchedule(
+      *query_schedule, pool_config, 2, 2, &not_admitted_reason));
+  EXPECT_STR_CONTAINS(not_admitted_reason,
+      "request memory needed 150.00 MB is greater than memory available for "
+      "admission 100.00 MB of host1:25000");
+  ASSERT_FALSE(admission_controller->HasAvailableMemResources(
+      *query_schedule, pool_config, 2, &not_admitted_reason));
+  EXPECT_STR_CONTAINS(not_admitted_reason,
+      "Not enough memory available on host host1:25000. Needed 150.00 MB but only "
+      "100.00 MB out of 100.00 MB was available.");
+  not_admitted_reason.clear();
+
+  // Test 3: Make sure that coord and executors have separate checks on for whether their
+  // mem limits can accommodate their respective initial reservations.
+    query_schedule = MakeQuerySchedule(
+      "default", 0, pool_config, 2, PER_EXEC_MEM_ESTIMATE, COORD_MEM_ESTIMATE, true);
+    pool_config.__set_min_query_mem_limit(MEGABYTE); // to auto set mem_limit(s).
+    query_schedule->UpdateMemoryRequirements(pool_config);
+    query_schedule->set_largest_min_reservation(600 * MEGABYTE);
+    query_schedule->set_coord_min_reservation(50 * MEGABYTE);
+    ASSERT_TRUE(AdmissionController::CanAccommodateMaxInitialReservation(
+        *query_schedule, pool_config, &not_admitted_reason));
+    // Coordinator reservation doesn't fit.
+    query_schedule->set_coord_min_reservation(200 * MEGABYTE);
+    ASSERT_FALSE(AdmissionController::CanAccommodateMaxInitialReservation(
+        *query_schedule, pool_config, &not_admitted_reason));
+    EXPECT_STR_CONTAINS(not_admitted_reason, "minimum memory reservation is greater "
+        "than memory available to the query for buffer reservations. Memory reservation "
+        "needed given the current plan: 200.00 MB");
+    // Neither coordinator or executor reservation fits.
+    query_schedule->set_largest_min_reservation(GIGABYTE);
+    ASSERT_FALSE(AdmissionController::CanAccommodateMaxInitialReservation(
+        *query_schedule, pool_config, &not_admitted_reason));
+    EXPECT_STR_CONTAINS(not_admitted_reason, "minimum memory reservation is greater "
+        "than memory available to the query for buffer reservations. Memory reservation "
+        "needed given the current plan: 1.00 GB");
+    // Coordinator reservation doesn't fit.
+    query_schedule->set_coord_min_reservation(50 * MEGABYTE);
+    query_schedule->set_largest_min_reservation(GIGABYTE);
+    ASSERT_FALSE(AdmissionController::CanAccommodateMaxInitialReservation(
+        *query_schedule, pool_config, &not_admitted_reason));
+    EXPECT_STR_CONTAINS(not_admitted_reason, "minimum memory reservation is greater "
+        "than memory available to the query for buffer reservations. Memory reservation "
+        "needed given the current plan: 1.00 GB");
+}
+
 } // end namespace impala
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index edaf5e5..d31c50b 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -56,6 +56,8 @@ DEFINE_int64_hidden(admission_control_stale_topic_threshold_ms, 5 * 1000,
     "capture most cases where the Impala daemon is disconnected from the statestore "
     "or topic updates are seriously delayed.");
 
+DECLARE_bool(is_executor);
+
 namespace impala {
 
 const int64_t AdmissionController::PoolStats::HISTOGRAM_NUM_OF_BINS = 128;
@@ -202,7 +204,7 @@ const string REASON_REQ_OVER_POOL_MEM =
     "The total memory needed is the per-node MEM_LIMIT times the number of nodes "
     "executing the query. See the Admission Control documentation for more information.";
 const string REASON_REQ_OVER_NODE_MEM =
-    "request memory needed $0 per node is greater than memory available for admission $1 "
+    "request memory needed $0 is greater than memory available for admission $1 "
     "of $2.\n\nUse the MEM_LIMIT query option to indicate how much memory is required "
     "per node.";
 const string REASON_THREAD_RESERVATION_LIMIT_EXCEEDED =
@@ -230,7 +232,7 @@ const string POOL_MEM_NOT_AVAILABLE =
     "Not enough aggregate memory available in pool $0 "
     "with max mem resources $1 ($2). Needed $3 but only $4 was available.$5";
 // $0 = host name, $1 = host mem needed, $3 = host mem available, $4 = staleness detail
-const string HOST_MEM_NOT_AVAILABLE = "Not enough memory available on host $0."
+const string HOST_MEM_NOT_AVAILABLE = "Not enough memory available on host $0. "
     "Needed $1 but only $2 out of $3 was available.$4";
 
 // $0 = host name, $1 = num admitted, $2 = max requests
@@ -392,18 +394,20 @@ void AdmissionController::PoolStats::Dequeue(bool timed_out) {
 }
 
 void AdmissionController::UpdateHostStats(
-    const QuerySchedule& schedule, int64_t per_node_mem, int64_t num_queries) {
-  DCHECK_NE(per_node_mem, 0);
-  DCHECK(num_queries == 1 || num_queries == -1)
-      << "Invalid number of queries: " << num_queries;
+    const QuerySchedule& schedule, bool is_admitting) {
+  int64_t per_backend_mem_to_admit = schedule.per_backend_mem_to_admit();
   for (const auto& entry : schedule.per_backend_exec_params()) {
     const TNetworkAddress& host_addr = entry.first;
+    int64_t mem_to_admit = entry.second.contains_coord_fragment ?
+        schedule.coord_backend_mem_to_admit() : per_backend_mem_to_admit;
+    if (!is_admitting) mem_to_admit *= -1;
     const string host = TNetworkAddressToString(host_addr);
     VLOG_ROW << "Update admitted mem reserved for host=" << host
              << " prev=" << PrintBytes(host_stats_[host].mem_admitted)
-             << " new=" << PrintBytes(host_stats_[host].mem_admitted + per_node_mem);
-    host_stats_[host].mem_admitted += per_node_mem;
+             << " new=" << PrintBytes(host_stats_[host].mem_admitted + mem_to_admit);
+    host_stats_[host].mem_admitted += mem_to_admit;
     DCHECK_GE(host_stats_[host].mem_admitted, 0);
+    int num_queries = is_admitting ? 1 : -1;
     VLOG_ROW << "Update admitted queries for host=" << host
              << " prev=" << host_stats_[host].num_admitted
              << " new=" << host_stats_[host].num_admitted + num_queries;
@@ -412,23 +416,34 @@ void AdmissionController::UpdateHostStats(
   }
 }
 
+// Helper method used by CanAccommodateMaxInitialReservation(). Returns true if the given
+// 'mem_limit' can accommodate 'buffer_reservation'. If not, returns false and the
+// details about the memory shortage in 'mem_unavailable_reason'.
+static bool CanMemLimitAccommodateReservation(
+    int64_t mem_limit, int64_t buffer_reservation, string* mem_unavailable_reason) {
+  if (mem_limit <= 0) return true; // No mem limit.
+  const int64_t max_reservation =
+      ReservationUtil::GetReservationLimitFromMemLimit(mem_limit);
+  if (buffer_reservation <= max_reservation) return true;
+  const int64_t required_mem_limit =
+      ReservationUtil::GetMinMemLimitFromReservation(buffer_reservation);
+  *mem_unavailable_reason = Substitute(REASON_MEM_LIMIT_TOO_LOW_FOR_RESERVATION,
+      PrintBytes(buffer_reservation), PrintBytes(required_mem_limit));
+  return false;
+}
+
 bool AdmissionController::CanAccommodateMaxInitialReservation(
     const QuerySchedule& schedule, const TPoolConfig& pool_cfg,
     string* mem_unavailable_reason) {
-  const int64_t per_backend_mem_limit = schedule.per_backend_mem_limit();
-  if (per_backend_mem_limit > 0) {
-    const int64_t max_reservation =
-        ReservationUtil::GetReservationLimitFromMemLimit(per_backend_mem_limit);
-    const int64_t largest_min_mem_reservation = schedule.largest_min_reservation();
-    if (largest_min_mem_reservation > max_reservation) {
-      const int64_t required_mem_limit =
-          ReservationUtil::GetMinMemLimitFromReservation(largest_min_mem_reservation);
-      *mem_unavailable_reason = Substitute(REASON_MEM_LIMIT_TOO_LOW_FOR_RESERVATION,
-          PrintBytes(largest_min_mem_reservation), PrintBytes(required_mem_limit));
-      return false;
-    }
-  }
-  return true;
+  const int64_t executor_mem_limit = schedule.per_backend_mem_limit();
+  const int64_t executor_min_reservation = schedule.largest_min_reservation();
+  const int64_t coord_mem_limit = schedule.coord_backend_mem_limit();
+  const int64_t coord_min_reservation = schedule.coord_min_reservation();
+  return CanMemLimitAccommodateReservation(
+             executor_mem_limit, executor_min_reservation, mem_unavailable_reason)
+      && (!schedule.requiresCoordinatorFragment()
+             || CanMemLimitAccommodateReservation(
+                    coord_mem_limit, coord_min_reservation, mem_unavailable_reason));
 }
 
 bool AdmissionController::HasAvailableMemResources(const QuerySchedule& schedule,
@@ -444,11 +459,10 @@ bool AdmissionController::HasAvailableMemResources(const QuerySchedule& schedule
   //    specified.
   // 2) Each individual backend must have enough mem available within its process limit
   //    to execute the query.
-  int64_t per_host_mem_to_admit = schedule.per_backend_mem_to_admit();
-  int64_t cluster_mem_to_admit = schedule.GetClusterMemoryToAdmit();
 
   // Case 1:
   PoolStats* stats = GetPoolStats(schedule);
+  int64_t cluster_mem_to_admit = schedule.GetClusterMemoryToAdmit();
   VLOG_RPC << "Checking agg mem in pool=" << pool_name << " : " << stats->DebugString()
            << " executor_group=" << schedule.executor_group()
            << " cluster_mem_needed=" << PrintBytes(cluster_mem_to_admit)
@@ -464,6 +478,8 @@ bool AdmissionController::HasAvailableMemResources(const QuerySchedule& schedule
   }
 
   // Case 2:
+  int64_t executor_mem_to_admit = schedule.per_backend_mem_to_admit();
+  int64_t coord_mem_to_admit = schedule.coord_backend_mem_to_admit();
   for (const auto& entry : schedule.per_backend_exec_params()) {
     const TNetworkAddress& host = entry.first;
     const string host_id = TNetworkAddressToString(host);
@@ -471,15 +487,17 @@ bool AdmissionController::HasAvailableMemResources(const QuerySchedule& schedule
     const HostStats& host_stats = host_stats_[host_id];
     int64_t mem_reserved = host_stats.mem_reserved;
     int64_t mem_admitted = host_stats.mem_admitted;
+    int64_t mem_to_admit = entry.second.contains_coord_fragment ?
+        coord_mem_to_admit : executor_mem_to_admit;
     VLOG_ROW << "Checking memory on host=" << host_id
              << " mem_reserved=" << PrintBytes(mem_reserved)
              << " mem_admitted=" << PrintBytes(mem_admitted)
-             << " needs=" << PrintBytes(per_host_mem_to_admit)
+             << " needs=" << PrintBytes(mem_to_admit)
              << " admit_mem_limit=" << PrintBytes(admit_mem_limit);
     int64_t effective_host_mem_reserved = std::max(mem_reserved, mem_admitted);
-    if (effective_host_mem_reserved + per_host_mem_to_admit > admit_mem_limit) {
+    if (effective_host_mem_reserved + mem_to_admit > admit_mem_limit) {
       *mem_unavailable_reason =
-          Substitute(HOST_MEM_NOT_AVAILABLE, host_id, PrintBytes(per_host_mem_to_admit),
+          Substitute(HOST_MEM_NOT_AVAILABLE, host_id, PrintBytes(mem_to_admit),
               PrintBytes(max(admit_mem_limit - effective_host_mem_reserved, 0L)),
               PrintBytes(admit_mem_limit), GetStalenessDetailLocked(" "));
       return false;
@@ -610,13 +628,15 @@ bool AdmissionController::RejectForSchedule(const QuerySchedule& schedule,
     string* rejection_reason) {
   DCHECK(rejection_reason != nullptr && rejection_reason->empty());
 
-  // Compute the max (over all backends) and cluster total (across all backends) for
-  // min_mem_reservation_bytes and thread_reservation and the min (over all backends)
-  // min_admit_mem_limit.
+  // Compute the max (over all backends), the cluster totals (across all backends) for
+  // min_mem_reservation_bytes, thread_reservation, the min admit_mem_limit
+  // (over all executors) and the admit_mem_limit of the coordinator.
   pair<const TNetworkAddress*, int64_t> largest_min_mem_reservation(nullptr, -1);
   int64_t cluster_min_mem_reservation_bytes = 0;
   pair<const TNetworkAddress*, int64_t> max_thread_reservation(nullptr, 0);
-  pair<const TNetworkAddress*, int64_t> min_admit_mem_limit(
+  pair<const TNetworkAddress*, int64_t> min_executor_admit_mem_limit(
+      nullptr, std::numeric_limits<int64_t>::max());
+  pair<const TNetworkAddress*, int64_t> coord_admit_mem_limit(
       nullptr, std::numeric_limits<int64_t>::max());
   int64_t cluster_thread_reservation = 0;
   for (const auto& e : schedule.per_backend_exec_params()) {
@@ -629,9 +649,12 @@ bool AdmissionController::RejectForSchedule(const QuerySchedule& schedule,
     if (e.second.thread_reservation > max_thread_reservation.second) {
       max_thread_reservation = make_pair(&e.first, e.second.thread_reservation);
     }
-    if (e.second.admit_mem_limit < min_admit_mem_limit.second) {
-      min_admit_mem_limit.first = &e.first;
-      min_admit_mem_limit.second = e.second.admit_mem_limit;
+    if (e.second.contains_coord_fragment) {
+      coord_admit_mem_limit.first = &e.first;
+      coord_admit_mem_limit.second = e.second.admit_mem_limit;
+    } else if (e.second.admit_mem_limit < min_executor_admit_mem_limit.second) {
+      min_executor_admit_mem_limit.first = &e.first;
+      min_executor_admit_mem_limit.second = e.second.admit_mem_limit;
     }
   }
 
@@ -689,16 +712,30 @@ bool AdmissionController::RejectForSchedule(const QuerySchedule& schedule,
               PrintBytes(max_mem), GetMaxMemForPoolDescription(pool_cfg, group_size));
       return true;
     }
-    int64_t per_backend_mem_to_admit = schedule.per_backend_mem_to_admit();
-    VLOG_ROW << "Checking backend mem with per_backend_mem_to_admit = "
-             << per_backend_mem_to_admit
-             << " and min_admit_mem_limit.second = " << min_admit_mem_limit.second;
-    if (per_backend_mem_to_admit > min_admit_mem_limit.second) {
-      *rejection_reason = Substitute(REASON_REQ_OVER_NODE_MEM,
-          PrintBytes(per_backend_mem_to_admit), PrintBytes(min_admit_mem_limit.second),
-          TNetworkAddressToString(*min_admit_mem_limit.first));
+    int64_t executor_mem_to_admit = schedule.per_backend_mem_to_admit();
+    VLOG_ROW << "Checking executor mem with executor_mem_to_admit = "
+             << executor_mem_to_admit
+             << " and min_admit_mem_limit.second = "
+             << min_executor_admit_mem_limit.second;
+    if (executor_mem_to_admit > min_executor_admit_mem_limit.second) {
+      *rejection_reason =
+          Substitute(REASON_REQ_OVER_NODE_MEM, PrintBytes(executor_mem_to_admit),
+              PrintBytes(min_executor_admit_mem_limit.second),
+              TNetworkAddressToString(*min_executor_admit_mem_limit.first));
       return true;
     }
+    if (schedule.requiresCoordinatorFragment()) {
+      int64_t coord_mem_to_admit = schedule.coord_backend_mem_to_admit();
+      VLOG_ROW << "Checking coordinator mem with coord_mem_to_admit = "
+               << coord_mem_to_admit
+               << " and coord_admit_mem_limit.second = " << coord_admit_mem_limit.second;
+      if (coord_mem_to_admit > coord_admit_mem_limit.second) {
+        *rejection_reason = Substitute(REASON_REQ_OVER_NODE_MEM,
+            PrintBytes(coord_mem_to_admit), PrintBytes(coord_admit_mem_limit.second),
+            TNetworkAddressToString(*coord_admit_mem_limit.first));
+        return true;
+      }
+    }
   }
   return false;
 }
@@ -710,8 +747,7 @@ void AdmissionController::PoolStats::UpdateConfigMetrics(
   metrics_.pool_max_queued->SetValue(pool_cfg.max_queued);
   metrics_.max_query_mem_limit->SetValue(pool_cfg.max_query_mem_limit);
   metrics_.min_query_mem_limit->SetValue(pool_cfg.min_query_mem_limit);
-  metrics_.clamp_mem_limit_query_option->SetValue(
-      pool_cfg.clamp_mem_limit_query_option);
+  metrics_.clamp_mem_limit_query_option->SetValue(pool_cfg.clamp_mem_limit_query_option);
   metrics_.max_running_queries_multiple->SetValue(pool_cfg.max_running_queries_multiple);
   metrics_.max_queued_queries_multiple->SetValue(pool_cfg.max_queued_queries_multiple);
   metrics_.max_memory_multiple->SetValue(pool_cfg.max_memory_multiple);
@@ -896,7 +932,7 @@ void AdmissionController::ReleaseQuery(
     lock_guard<mutex> lock(admission_ctrl_lock_);
     PoolStats* stats = GetPoolStats(schedule);
     stats->Release(schedule, peak_mem_consumption);
-    UpdateHostStats(schedule, -schedule.per_backend_mem_to_admit(), -1);
+    UpdateHostStats(schedule, /*is_admitting=*/false);
     pools_for_updates_.insert(pool_name);
     VLOG_RPC << "Released query id=" << PrintId(schedule.query_id()) << " "
              << stats->DebugString();
@@ -1112,9 +1148,11 @@ Status AdmissionController::ComputeGroupSchedules(
   for (const ExecutorGroup* executor_group : executor_groups) {
     DCHECK(executor_group->IsHealthy());
     DCHECK_GT(executor_group->NumExecutors(), 0);
+    bool is_dedicated_coord = !FLAGS_is_executor;
     unique_ptr<QuerySchedule> group_schedule =
         make_unique<QuerySchedule>(request.query_id, request.request,
-            request.query_options, request.summary_profile, request.query_events);
+            request.query_options, is_dedicated_coord, request.summary_profile,
+            request.query_events);
     const string& group_name = executor_group->name();
     VLOG(3) << "Scheduling for executor group: " << group_name << " with "
             << executor_group->NumExecutors() << " executors";
@@ -1170,7 +1208,9 @@ bool AdmissionController::FindGroupToAdmitOrReject(int64_t cluster_size,
     VLOG_QUERY << "Trying to admit id=" << PrintId(schedule->query_id())
                << " in pool_name=" << pool_name << " executor_group_name=" << group_name
                << " per_host_mem_estimate="
-               << PrintBytes(schedule->GetPerHostMemoryEstimate())
+               << PrintBytes(schedule->GetPerExecutorMemoryEstimate())
+               << " dedicated_coord_mem_estimate="
+               << PrintBytes(schedule->GetDedicatedCoordMemoryEstimate())
                << " max_requests=" << max_requests << " ("
                << GetMaxRequestsForPoolDescription(pool_config, cluster_size) << ")"
                << " max_queued=" << max_queued << " ("
@@ -1408,10 +1448,14 @@ void AdmissionController::AdmitQuery(QuerySchedule* schedule, bool was_queued) {
            << " per_backend_mem_limit set to: "
            << PrintBytes(schedule->per_backend_mem_limit())
            << " per_backend_mem_to_admit set to: "
-           << PrintBytes(schedule->per_backend_mem_to_admit());
+           << PrintBytes(schedule->per_backend_mem_to_admit())
+           << " coord_backend_mem_limit set to: "
+           << PrintBytes(schedule->coord_backend_mem_limit())
+           << " coord_backend_mem_to_admit set to: "
+           << PrintBytes(schedule->coord_backend_mem_to_admit());;
   // Update memory and number of queries.
   pool_stats->Admit(*schedule);
-  UpdateHostStats(*schedule, schedule->per_backend_mem_to_admit(), 1);
+  UpdateHostStats(*schedule, /* is_admitting=*/true);
   // Update summary profile.
   const string& admission_result =
       was_queued ? PROFILE_INFO_VAL_ADMIT_QUEUED : PROFILE_INFO_VAL_ADMIT_IMMEDIATELY;
@@ -1487,6 +1531,10 @@ void AdmissionController::PoolToJsonLocked(const string& pool_name,
         "mem_limit", schedule->per_backend_mem_limit(), document->GetAllocator());
     query_info.AddMember("mem_limit_to_admit", schedule->per_backend_mem_to_admit(),
         document->GetAllocator());
+    query_info.AddMember("coord_mem_limit", schedule->coord_backend_mem_limit(),
+        document->GetAllocator());
+    query_info.AddMember("coord_mem_to_admit",
+        schedule->coord_backend_mem_to_admit(), document->GetAllocator());
     query_info.AddMember("num_backends", schedule->per_backend_exec_params().size(),
         document->GetAllocator());
     queued_queries.PushBack(query_info, document->GetAllocator());
diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h
index d71327c..7cf2343 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -770,16 +770,19 @@ class AdmissionController {
   bool CanAdmitRequest(const QuerySchedule& schedule, const TPoolConfig& pool_cfg,
       int64_t cluster_size, bool admit_from_queue, std::string* not_admitted_reason);
 
-  /// Returns true if the per host mem limit for the query represented by 'schedule' is
-  /// large enough to accommodate the largest initial reservation required. Otherwise,
-  /// returns false with the details about the memory shortage in
-  /// 'mem_unavailable_reason'. Possible cases where it can return false are:
+  /// Returns true if all executors can accommodate the largest initial reservation of
+  /// any executor and the backend running the coordinator fragment can accommodate its
+  /// own initial reservation. Otherwise, returns false with the details about the memory
+  /// shortage in 'mem_unavailable_reason'. Possible cases where it can return false are:
   /// 1. The pool.max_query_mem_limit is set too low
   /// 2. mem_limit in query options is set low and no max/min_query_mem_limit is set in
   ///    the pool configuration.
   /// 3. mem_limit in query options is set low and min_query_mem_limit is also set low.
   /// 4. mem_limit in query options is set low and the pool.min_query_mem_limit is set
   ///    to a higher value but pool.clamp_mem_limit_query_option is false.
+  /// 5. If a dedicated coordinator is used and the mem_limit in query options is set
+  ///    lower than what is required to support the sum of initial memory reservations of
+  ///    the fragments scheduled on the coordinator.
   static bool CanAccommodateMaxInitialReservation(const QuerySchedule& schedule,
       const TPoolConfig& pool_cfg, std::string* mem_unavailable_reason);
 
@@ -800,11 +803,10 @@ class AdmissionController {
   bool HasAvailableSlot(const QuerySchedule& schedule, const TPoolConfig& pool_cfg,
       string* unavailable_reason);
 
-  /// Adds 'per_node_mem' and 'num_queries' to the per-host stats in host_stats_ for each
-  /// host in 'schedule'. Must hold admission_ctrl_lock_. Note that 'per_node_mem' and
-  /// 'num_queries' may be negative when a query completes.
-  void UpdateHostStats(
-      const QuerySchedule& schedule, int64_t per_node_mem, int64_t num_queries);
+  /// Updates the memory admitted and the num of queries running for each host in
+  /// 'schedule'. If 'is_admitting' is true, the memory admitted and the num of queries is
+  /// increased, otherwise it is decreased.
+  void UpdateHostStats(const QuerySchedule& schedule, bool is_admitting);
 
   /// Rejection happens in several stages
   /// 1) Based on static pool configuration
@@ -938,6 +940,8 @@ class AdmissionController {
   FRIEND_TEST(AdmissionControllerTest, CanAdmitRequestCount);
   FRIEND_TEST(AdmissionControllerTest, GetMaxToDequeue);
   FRIEND_TEST(AdmissionControllerTest, QueryRejection);
+  FRIEND_TEST(AdmissionControllerTest, DedicatedCoordQuerySchedule);
+  FRIEND_TEST(AdmissionControllerTest, DedicatedCoordAdmissionChecks);
   friend class AdmissionControllerTest;
 };
 
diff --git a/be/src/scheduling/query-schedule.cc b/be/src/scheduling/query-schedule.cc
index 84d939a..fceee52 100644
--- a/be/src/scheduling/query-schedule.cc
+++ b/be/src/scheduling/query-schedule.cc
@@ -35,10 +35,15 @@ using boost::uuids::random_generator;
 using boost::uuids::uuid;
 using namespace impala;
 
+DEFINE_bool_hidden(use_dedicated_coordinator_estimates, true,
+    "Hidden option to fall back to legacy memory estimation logic for dedicated"
+    " coordinators wherein the same per backend estimate is used for both coordinators "
+    "and executors.");
+
 namespace impala {
 
-QuerySchedule::QuerySchedule(const TUniqueId& query_id,
-    const TQueryExecRequest& request, const TQueryOptions& query_options,
+QuerySchedule::QuerySchedule(const TUniqueId& query_id, const TQueryExecRequest& request,
+    const TQueryOptions& query_options, bool is_dedicated_coord,
     RuntimeProfile* summary_profile, RuntimeProfile::EventSequence* query_events)
   : query_id_(query_id),
     request_(request),
@@ -46,18 +51,21 @@ QuerySchedule::QuerySchedule(const TUniqueId& query_id,
     summary_profile_(summary_profile),
     query_events_(query_events),
     num_scan_ranges_(0),
-    next_instance_id_(query_id) {
+    next_instance_id_(query_id),
+    is_dedicated_coord_(is_dedicated_coord) {
   Init();
 }
 
 QuerySchedule::QuerySchedule(const TUniqueId& query_id, const TQueryExecRequest& request,
-    const TQueryOptions& query_options, RuntimeProfile* summary_profile)
+    const TQueryOptions& query_options, bool is_dedicated_coord,
+    RuntimeProfile* summary_profile)
   : query_id_(query_id),
     request_(request),
     query_options_(query_options),
     summary_profile_(summary_profile),
     num_scan_ranges_(0),
-    next_instance_id_(query_id) {
+    next_instance_id_(query_id),
+    is_dedicated_coord_(is_dedicated_coord) {
   // Init() is not called, this constructor is for white box testing only.
   DCHECK(TestInfo::is_test());
 }
@@ -81,7 +89,7 @@ void QuerySchedule::Init() {
 
   // mark coordinator fragment
   const TPlanFragment& root_fragment = request_.plan_exec_info[0].fragments[0];
-  if (request_.stmt_type == TStmtType::QUERY) {
+  if (requiresCoordinatorFragment()) {
     fragment_exec_params_[root_fragment.idx].is_coord_fragment = true;
     // the coordinator instance gets index 0, generated instance ids start at 1
     next_instance_id_ = CreateInstanceId(next_instance_id_, 1);
@@ -177,11 +185,16 @@ void QuerySchedule::Validate() const {
   // TODO: add validation for BackendExecParams
 }
 
-int64_t QuerySchedule::GetPerHostMemoryEstimate() const {
+int64_t QuerySchedule::GetPerExecutorMemoryEstimate() const {
   DCHECK(request_.__isset.per_host_mem_estimate);
   return request_.per_host_mem_estimate;
 }
 
+int64_t QuerySchedule::GetDedicatedCoordMemoryEstimate() const {
+  DCHECK(request_.__isset.dedicated_coord_mem_estimate);
+  return request_.dedicated_coord_mem_estimate;
+}
+
 TUniqueId QuerySchedule::GetNextInstanceId() {
   TUniqueId result = next_instance_id_;
   ++next_instance_id_.lo;
@@ -200,7 +213,6 @@ const TPlanFragment* QuerySchedule::GetCoordFragment() const {
   return fragment;
 }
 
-
 void QuerySchedule::GetTPlanFragments(vector<const TPlanFragment*>* fragments) const {
   fragments->clear();
   for (const TPlanExecInfo& plan_info: request_.plan_exec_info) {
@@ -211,7 +223,7 @@ void QuerySchedule::GetTPlanFragments(vector<const TPlanFragment*>* fragments) c
 }
 
 const FInstanceExecParams& QuerySchedule::GetCoordInstanceExecParams() const {
-  DCHECK_EQ(request_.stmt_type, TStmtType::QUERY);
+  DCHECK(requiresCoordinatorFragment());
   const TPlanFragment& coord_fragment =  request_.plan_exec_info[0].fragments[0];
   const FragmentExecParams& fragment_params = fragment_exec_params_[coord_fragment.idx];
   DCHECK_EQ(fragment_params.instance_exec_params.size(), 1);
@@ -235,32 +247,51 @@ int QuerySchedule::GetNumFragmentInstances() const {
 }
 
 int64_t QuerySchedule::GetClusterMemoryToAdmit() const {
-  return per_backend_mem_to_admit() *  per_backend_exec_params_.size();
+  if (!requiresCoordinatorFragment()) {
+    // For this case, there will be no coordinator fragment so only use the per
+    // executor mem to admit while accounting for admitted memory. This will also ensure
+    // the per backend mem admitted accounting is consistent with the cluster-wide mem
+    // admitted.
+    return per_backend_mem_to_admit() * per_backend_exec_params_.size();
+  } else {
+    return per_backend_mem_to_admit() * (per_backend_exec_params_.size() - 1)
+        + coord_backend_mem_to_admit();
+  }
+}
+
+bool QuerySchedule::UseDedicatedCoordEstimates() const {
+  return is_dedicated_coord_ && FLAGS_use_dedicated_coordinator_estimates;
 }
 
 void QuerySchedule::UpdateMemoryRequirements(const TPoolConfig& pool_cfg) {
   // If the min_query_mem_limit and max_query_mem_limit are not set in the pool config
-  // then it falls back to traditional(old) behavior, which means that, if for_admission
-  // is false, it returns the mem_limit if it is set in the query options, else returns -1
-  // which means no limit; if for_admission is true, it returns the mem_limit if it is set
-  // in the query options, else returns the per host mem estimate calculated during
-  // planning.
+  // then it falls back to traditional(old) behavior, which means that, it sets the
+  // mem_limit if it is set in the query options, else sets it to -1 (no limit).
   bool mimic_old_behaviour =
       pool_cfg.min_query_mem_limit == 0 && pool_cfg.max_query_mem_limit == 0;
 
   per_backend_mem_to_admit_ = 0;
+  coord_backend_mem_to_admit_ = 0;
   bool has_query_option = false;
   if (query_options().__isset.mem_limit && query_options().mem_limit > 0) {
     per_backend_mem_to_admit_ = query_options().mem_limit;
+    coord_backend_mem_to_admit_ = query_options().mem_limit;
     has_query_option = true;
   }
 
   if (!has_query_option) {
-    per_backend_mem_to_admit_ = GetPerHostMemoryEstimate();
+    per_backend_mem_to_admit_ = GetPerExecutorMemoryEstimate();
+    coord_backend_mem_to_admit_ = UseDedicatedCoordEstimates() ?
+        GetDedicatedCoordMemoryEstimate() :
+        GetPerExecutorMemoryEstimate();
     if (!mimic_old_behaviour) {
-      int64_t min_mem_limit_required = ReservationUtil::GetMinMemLimitFromReservation(
-          largest_min_reservation());
+      int64_t min_mem_limit_required =
+          ReservationUtil::GetMinMemLimitFromReservation(largest_min_reservation());
       per_backend_mem_to_admit_ = max(per_backend_mem_to_admit_, min_mem_limit_required);
+      int64_t min_coord_mem_limit_required =
+          ReservationUtil::GetMinMemLimitFromReservation(coord_min_reservation());
+      coord_backend_mem_to_admit_ =
+          max(coord_backend_mem_to_admit_, min_coord_mem_limit_required);
     }
   }
 
@@ -268,21 +299,38 @@ void QuerySchedule::UpdateMemoryRequirements(const TPoolConfig& pool_cfg) {
     if (pool_cfg.min_query_mem_limit > 0) {
       per_backend_mem_to_admit_ =
           max(per_backend_mem_to_admit_, pool_cfg.min_query_mem_limit);
+      if (!UseDedicatedCoordEstimates() || has_query_option) {
+        // The minimum mem limit option does not apply to dedicated coordinators -
+        // this would result in over-reserving of memory. Treat coordinator and
+        // executor mem limits the same if the query option was explicitly set.
+        coord_backend_mem_to_admit_ =
+            max(coord_backend_mem_to_admit_, pool_cfg.min_query_mem_limit);
+      }
     }
     if (pool_cfg.max_query_mem_limit > 0) {
       per_backend_mem_to_admit_ =
           min(per_backend_mem_to_admit_, pool_cfg.max_query_mem_limit);
+      coord_backend_mem_to_admit_ =
+          min(coord_backend_mem_to_admit_, pool_cfg.max_query_mem_limit);
     }
   }
 
   // Cap the memory estimate at the amount of physical memory available. The user's
   // provided value or the estimate from planning can each be unreasonable.
   per_backend_mem_to_admit_ = min(per_backend_mem_to_admit_, MemInfo::physical_mem());
+  coord_backend_mem_to_admit_ = min(coord_backend_mem_to_admit_, MemInfo::physical_mem());
+
+  // If the query is only scheduled to run on the coordinator.
+  if (per_backend_exec_params_.size() == 1 && requiresCoordinatorFragment()) {
+    per_backend_mem_to_admit_ = 0;
+  }
 
   if (mimic_old_behaviour && !has_query_option) {
     per_backend_mem_limit_ = -1;
+    coord_backend_mem_limit_ = -1;
   } else {
     per_backend_mem_limit_ = per_backend_mem_to_admit_;
+    coord_backend_mem_limit_ = coord_backend_mem_to_admit_;
   }
 }
 
diff --git a/be/src/scheduling/query-schedule.h b/be/src/scheduling/query-schedule.h
index 6143fb2..ad169c7 100644
--- a/be/src/scheduling/query-schedule.h
+++ b/be/src/scheduling/query-schedule.h
@@ -81,6 +81,9 @@ struct BackendExecParams {
 
   // The maximum number of queries that this backend can execute concurrently.
   int64_t admit_num_queries_limit = 0;
+
+  // Indicates whether this backend will run the coordinator fragment.
+  bool contains_coord_fragment = false;
 };
 
 /// Map from an impalad host address to the list of assigned fragment instance params.
@@ -159,12 +162,14 @@ struct FragmentExecParams {
 class QuerySchedule {
  public:
   QuerySchedule(const TUniqueId& query_id, const TQueryExecRequest& request,
-      const TQueryOptions& query_options, RuntimeProfile* summary_profile,
+      const TQueryOptions& query_options, bool is_dedicated_coord,
+      RuntimeProfile* summary_profile,
       RuntimeProfile::EventSequence* query_events);
 
   /// For testing only: build a QuerySchedule object but do not run Init().
   QuerySchedule(const TUniqueId& query_id, const TQueryExecRequest& request,
-      const TQueryOptions& query_options, RuntimeProfile* summary_profile);
+      const TQueryOptions& query_options, bool is_dedicated_coord,
+      RuntimeProfile* summary_profile);
 
   /// Verifies that the schedule is well-formed (and DCHECKs if it isn't):
   /// - all fragments have a FragmentExecParams
@@ -179,7 +184,12 @@ class QuerySchedule {
   const std::string& request_pool() const { return request().query_ctx.request_pool; }
 
   /// Returns the estimated memory (bytes) per-node from planning.
-  int64_t GetPerHostMemoryEstimate() const;
+  int64_t GetPerExecutorMemoryEstimate() const;
+
+  /// Returns the estimated memory (bytes) for the coordinator backend returned by the
+  /// planner. This estimate is only meaningful if this schedule was generated on a
+  /// dedicated coordinator.
+  int64_t GetDedicatedCoordMemoryEstimate() const;
 
   /// Helper methods used by scheduler to populate this QuerySchedule.
   void IncNumScanRanges(int64_t delta) { num_scan_ranges_ += delta; }
@@ -239,15 +249,26 @@ class QuerySchedule {
 
   int64_t largest_min_reservation() const { return largest_min_reservation_; }
 
+  int64_t coord_min_reservation() const { return coord_min_reservation_; }
+
   /// Must call UpdateMemoryRequirements() at least once before calling this.
   int64_t per_backend_mem_limit() const { return per_backend_mem_limit_; }
 
   /// Must call UpdateMemoryRequirements() at least once before calling this.
   int64_t per_backend_mem_to_admit() const {
-    DCHECK_GT(per_backend_mem_to_admit_, 0);
+    DCHECK_GE(per_backend_mem_to_admit_, 0);
     return per_backend_mem_to_admit_;
   }
 
+  /// Must call UpdateMemoryRequirements() at least once before calling this.
+  int64_t coord_backend_mem_limit() const { return coord_backend_mem_limit_; }
+
+  /// Must call UpdateMemoryRequirements() at least once before calling this.
+  int64_t coord_backend_mem_to_admit() const {
+    DCHECK_GT(coord_backend_mem_to_admit_, 0);
+    return coord_backend_mem_to_admit_;
+  }
+
   void set_per_backend_exec_params(const PerBackendExecParams& params) {
     per_backend_exec_params_ = params;
   }
@@ -256,10 +277,18 @@ class QuerySchedule {
     largest_min_reservation_ = largest_min_reservation;
   }
 
+  void set_coord_min_reservation(const int64_t coord_min_reservation) {
+    coord_min_reservation_ = coord_min_reservation;
+  }
+
   /// Returns the Cluster wide memory admitted by the admission controller.
   /// Must call UpdateMemoryRequirements() at least once before calling this.
   int64_t GetClusterMemoryToAdmit() const;
 
+  /// Returns true if coordinator estimates calculated by the planner and specialized for
+  /// dedicated a coordinator are to be used for estimating memory requirements.
+  bool UseDedicatedCoordEstimates() const;
+
   /// Populates or updates the per host query memory limit and the amount of memory to be
   /// admitted based on the pool configuration passed to it. Must be called at least once
   /// before making any calls to per_backend_mem_to_admit(), per_backend_mem_limit() and
@@ -270,6 +299,11 @@ class QuerySchedule {
 
   void set_executor_group(string executor_group);
 
+  /// Returns true if a coordinator fragment is required based on the query stmt type.
+  bool requiresCoordinatorFragment() const {
+    return request_.stmt_type == TStmtType::QUERY;
+  }
+
  private:
   /// These references are valid for the lifetime of this query schedule because they
   /// are all owned by the enclosing QueryExecState.
@@ -304,19 +338,36 @@ class QuerySchedule {
   /// Used to generate consecutive fragment instance ids.
   TUniqueId next_instance_id_;
 
-  /// The largest min memory reservation across all backends. Set in
+  /// The largest min memory reservation across all executors. Set in
   /// Scheduler::Schedule().
   int64_t largest_min_reservation_ = 0;
 
-  /// The memory limit per backend that will be imposed on the query.
+  /// The memory limit per executor that will be imposed on the query.
+  /// Set by the admission controller with a value that is only valid if it was admitted
+  /// successfully. -1 means no limit.
+  int64_t per_backend_mem_limit_ = -1;
+
+  /// The per executor memory used for admission accounting.
+  /// Set by the admission controller with a value that is only valid if it was admitted
+  /// successfully. Can be zero if the query is only scheduled to run on the coordinator.
+  int64_t per_backend_mem_to_admit_ = -1;
+
+  /// The memory limit for the coordinator that will be imposed on the query. Used only if
+  /// the query has a coordinator fragment.
   /// Set by the admission controller with a value that is only valid if it was admitted
   /// successfully. -1 means no limit.
-  int64_t per_backend_mem_limit_ = 0;
+  int64_t coord_backend_mem_limit_ = -1;
 
-  /// The per backend memory used for admission accounting.
+  /// The coordinator memory used for admission accounting.
   /// Set by the admission controller with a value that is only valid if it was admitted
   /// successfully.
-  int64_t per_backend_mem_to_admit_ = 0;
+  int64_t coord_backend_mem_to_admit_ = -1;
+
+  /// The coordinator's backend memory reservation. Set in Scheduler::Schedule().
+  int64_t coord_min_reservation_ = 0;
+
+  /// Indicates whether coordinator fragment will be running on a dedicated coordinator.
+  bool is_dedicated_coord_ = false;
 
   /// The name of the executor group that this schedule was computed for. Set by the
   /// Scheduler and only valid after scheduling completes successfully.
@@ -327,7 +378,6 @@ class QuerySchedule {
   /// Also populates plan_node_to_fragment_idx_ and plan_node_to_plan_node_idx_.
   void Init();
 };
-
 }
 
 #endif
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index de34738..b5e6807 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -673,21 +673,28 @@ void Scheduler::ComputeBackendExecParams(
       be_params.initial_mem_reservation_total_claims +=
           f.fragment.initial_mem_reservation_total_claims;
       be_params.thread_reservation += f.fragment.thread_reservation;
+      if (f.is_coord_fragment) be_params.contains_coord_fragment = true;
     }
   }
 
   int64_t largest_min_reservation = 0;
+  int64_t coord_min_reservation = 0;
   for (auto& backend : per_backend_params) {
     const TNetworkAddress& host = backend.first;
     const TBackendDescriptor be_desc = LookUpBackendDesc(executor_config, host);
     backend.second.admit_mem_limit = be_desc.admit_mem_limit;
     backend.second.admit_num_queries_limit = be_desc.admit_num_queries_limit;
     backend.second.be_desc = be_desc;
-    largest_min_reservation =
-        max(largest_min_reservation, backend.second.min_mem_reservation_bytes);
+    if (backend.second.contains_coord_fragment) {
+      coord_min_reservation = backend.second.min_mem_reservation_bytes;
+    } else {
+      largest_min_reservation =
+          max(largest_min_reservation, backend.second.min_mem_reservation_bytes);
+    }
   }
   schedule->set_per_backend_exec_params(per_backend_params);
   schedule->set_largest_min_reservation(largest_min_reservation);
+  schedule->set_coord_min_reservation(coord_min_reservation);
 
   stringstream min_mem_reservation_ss;
   stringstream num_fragment_instances_ss;
diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc
index c5b77a4..c24a8fb 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -935,8 +935,10 @@ void ImpalaHttpHandler::AdmissionStateHandler(
   // Now get running queries from CRS map.
   struct QueryInfo {
     TUniqueId query_id;
-    int64_t mem_limit;
-    int64_t mem_limit_for_admission;
+    int64_t executor_mem_limit;
+    int64_t executor_mem_to_admit;
+    int64_t coord_mem_limit;
+    int64_t coord_mem_to_admit;
     unsigned long num_backends;
   };
   unordered_map<string, vector<QueryInfo>> running_queries;
@@ -950,6 +952,8 @@ void ImpalaHttpHandler::AdmissionStateHandler(
       running_queries[request_state->request_pool()].push_back(
           {request_state->query_id(), request_state->schedule()->per_backend_mem_limit(),
               request_state->schedule()->per_backend_mem_to_admit(),
+              request_state->schedule()->coord_backend_mem_limit(),
+              request_state->schedule()->coord_backend_mem_to_admit(),
               static_cast<unsigned long>(
                   request_state->schedule()->per_backend_exec_params().size())});
   });
@@ -970,9 +974,14 @@ void ImpalaHttpHandler::AdmissionStateHandler(
       Value query_info(rapidjson::kObjectType);
       Value query_id(PrintId(info.query_id).c_str(), document->GetAllocator());
       query_info.AddMember("query_id", query_id, document->GetAllocator());
-      query_info.AddMember("mem_limit", info.mem_limit, document->GetAllocator());
       query_info.AddMember(
-          "mem_limit_to_admit", info.mem_limit_for_admission, document->GetAllocator());
+          "mem_limit", info.executor_mem_limit, document->GetAllocator());
+      query_info.AddMember(
+          "mem_limit_to_admit", info.executor_mem_to_admit, document->GetAllocator());
+      query_info.AddMember(
+          "coord_mem_limit", info.coord_mem_limit, document->GetAllocator());
+      query_info.AddMember(
+          "coord_mem_to_admit", info.coord_mem_to_admit, document->GetAllocator());
       query_info.AddMember("num_backends", info.num_backends, document->GetAllocator());
       queries_in_pool.PushBack(query_info, document->GetAllocator());
     }
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index c393e6b..5bb7d20 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -46,10 +46,7 @@ DECLARE_string(principal);
 DECLARE_string(local_library_dir);
 DECLARE_string(server_name);
 DECLARE_string(authorization_policy_provider_class);
-DECLARE_string(authorized_proxy_user_config);
-DECLARE_string(authorized_proxy_user_config_delimiter);
 DECLARE_string(authorized_proxy_group_config);
-DECLARE_string(authorized_proxy_group_config_delimiter);
 DECLARE_string(catalog_topic_mode);
 DECLARE_string(kudu_master_hosts);
 DECLARE_string(reserved_words_version);
@@ -78,6 +75,8 @@ DECLARE_string(authorization_provider);
 DECLARE_bool(recursively_list_partitions);
 DECLARE_string(query_event_hook_classes);
 DECLARE_int32(query_event_hook_nthreads);
+DECLARE_bool(is_executor);
+DECLARE_bool(use_dedicated_coordinator_estimates);
 
 namespace impala {
 
@@ -155,6 +154,9 @@ Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* cfg_bytes) {
   cfg.__set_recursively_list_partitions(FLAGS_recursively_list_partitions);
   cfg.__set_query_event_hook_classes(FLAGS_query_event_hook_classes);
   cfg.__set_query_event_hook_nthreads(FLAGS_query_event_hook_nthreads);
+  cfg.__set_is_executor(FLAGS_is_executor);
+  cfg.__set_use_dedicated_coordinator_estimates(
+      FLAGS_use_dedicated_coordinator_estimates);
   RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &cfg, cfg_bytes));
   return Status::OK();
 }
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index 063329f..9c5a075 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -131,4 +131,8 @@ struct TBackendGflags {
   53: required string query_event_hook_classes
 
   54: required i32 query_event_hook_nthreads
+
+  55: required bool is_executor
+
+  56: required bool use_dedicated_coordinator_estimates
 }
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index d6e5f70..e055bd9 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -444,6 +444,11 @@ struct TQueryExecRequest {
   // max DOP) required threads per host, i.e. the number of threads that this query
   // needs to execute successfully. Does not include "optional" threads.
   11: optional i64 max_per_host_thread_reservation;
+
+  // Estimated coordinator's memory consumption in bytes assuming that the coordinator
+  // fragment will run on a dedicated coordinator. Set by the planner and used by
+  // admission control.
+  12: optional i64 dedicated_coord_mem_estimate;
 }
 
 enum TCatalogOpType {
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
index c60778e..b55a546 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
@@ -113,6 +113,10 @@ public class PlanFragment extends TreeNode<PlanFragment> {
   // managed by this fragment.
   private long runtimeFiltersMemReservationBytes_ = 0;
 
+  public long getRuntimeFiltersMemReservationBytes() {
+    return runtimeFiltersMemReservationBytes_;
+  }
+
   /**
    * C'tor for fragment with specific partition; the output is by default broadcast.
    */
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index 087954b..b9d1f30 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -49,6 +49,7 @@ import org.apache.impala.thrift.TRuntimeFilterMode;
 import org.apache.impala.thrift.TTableName;
 import org.apache.impala.util.EventSequence;
 import org.apache.impala.util.KuduUtil;
+import org.apache.impala.util.MathUtil;
 import org.apache.impala.util.MaxRowsProcessedVisitor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,9 +70,20 @@ public class Planner {
   // estimates of zero, even if the contained PlanNodes have estimates of zero.
   public static final long MIN_PER_HOST_MEM_ESTIMATE_BYTES = 10 * 1024 * 1024;
 
+  // The amount of memory added to a dedicated coordinator's memory estimate to
+  // compensate for underestimation. In the general case estimates for exec
+  // nodes tend to overestimate and should work fine but for estimates in the
+  // 100-500 MB space, underestimates can be a problem. We pick a value of 100MB
+  // because it is trivial for large estimates and small enough to not make a
+  // huge impact on the coordinator's process memory (which ideally would be
+  // large).
+  public static final long DEDICATED_COORD_SAFETY_BUFFER_BYTES = 100 * 1024 * 1024;
+
   public static final ResourceProfile MIN_PER_HOST_RESOURCES =
-      new ResourceProfileBuilder().setMemEstimateBytes(MIN_PER_HOST_MEM_ESTIMATE_BYTES)
-      .setMinMemReservationBytes(0).build();
+      new ResourceProfileBuilder()
+          .setMemEstimateBytes(MIN_PER_HOST_MEM_ESTIMATE_BYTES)
+          .setMinMemReservationBytes(0)
+          .build();
 
   private final PlannerContext ctx_;
 
@@ -287,6 +299,10 @@ public class Planner {
           request.getMax_per_host_thread_reservation()));
       str.append(String.format("Per-Host Resource Estimates: Memory=%s\n",
           PrintUtils.printBytesRoundedToMb(request.getPer_host_mem_estimate())));
+      if (BackendConfig.INSTANCE.useDedicatedCoordinatorEstimates()) {
+        str.append(String.format("Dedicated Coordinator Resource Estimate: Memory=%s\n",
+            PrintUtils.printBytesRoundedToMb(request.getDedicated_coord_mem_estimate())));
+      }
       hasHeader = true;
     }
     // Warn if the planner is running in DEBUG mode.
@@ -389,10 +405,12 @@ public class Planner {
     // are scheduled on all nodes. The actual per-host resource requirements are computed
     // after scheduling.
     ResourceProfile maxPerHostPeakResources = ResourceProfile.invalid();
+    long totalRuntimeFilterMemBytes = 0;
 
     // Do a pass over all the fragments to compute resource profiles. Compute the
     // profiles bottom-up since a fragment's profile may depend on its descendants.
-    List<PlanFragment> allFragments = planRoots.get(0).getNodesPostOrder();
+    PlanFragment rootFragment = planRoots.get(0);
+    List<PlanFragment> allFragments = rootFragment.getNodesPostOrder();
     for (PlanFragment fragment: allFragments) {
       // Compute the per-node, per-sink and aggregate profiles for the fragment.
       fragment.computeResourceProfile(ctx_.getRootAnalyzer());
@@ -405,8 +423,11 @@ public class Planner {
       // per-fragment-instance peak resources.
       maxPerHostPeakResources = maxPerHostPeakResources.sum(
           fragment.getResourceProfile().multiply(fragment.getNumInstancesPerHost(mtDop)));
+      // Coordinator has to have a copy of each of the runtime filters to perform filter
+      // aggregation.
+      totalRuntimeFilterMemBytes += fragment.getRuntimeFiltersMemReservationBytes();
     }
-    planRoots.get(0).computePipelineMembership();
+    rootFragment.computePipelineMembership();
 
     Preconditions.checkState(maxPerHostPeakResources.getMemEstimateBytes() >= 0,
         maxPerHostPeakResources.getMemEstimateBytes());
@@ -415,13 +436,17 @@ public class Planner {
 
     maxPerHostPeakResources = MIN_PER_HOST_RESOURCES.max(maxPerHostPeakResources);
 
-    // TODO: Remove per_host_mem_estimate from the TQueryExecRequest when AC no longer
-    // needs it.
     request.setPer_host_mem_estimate(maxPerHostPeakResources.getMemEstimateBytes());
     request.setMax_per_host_min_mem_reservation(
         maxPerHostPeakResources.getMinMemReservationBytes());
     request.setMax_per_host_thread_reservation(
         maxPerHostPeakResources.getThreadReservation());
+    // Assuming the root fragment will always run on the coordinator backend, which
+    // might not be true for queries that don't have a coordinator fragment
+    // (request.getStmt_type() != TStmtType.QUERY). TODO: Fix in IMPALA-8791.
+    request.setDedicated_coord_mem_estimate(MathUtil.saturatingAdd(rootFragment
+        .getResourceProfile().getMemEstimateBytes(), totalRuntimeFilterMemBytes +
+        DEDICATED_COORD_SAFETY_BUFFER_BYTES));
     if (LOG.isTraceEnabled()) {
       LOG.trace("Max per-host min reservation: " +
           maxPerHostPeakResources.getMinMemReservationBytes());
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index aef3f41..e299c7f 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -165,6 +165,10 @@ public class BackendConfig {
     return backendCfg_.getQuery_event_hook_nthreads();
   }
 
+  public boolean useDedicatedCoordinatorEstimates() {
+    return !backendCfg_.is_executor && backendCfg_.use_dedicated_coordinator_estimates;
+  }
+
   // Inits the auth_to_local configuration in the static KerberosName class.
   private static void initAuthToLocal() {
     // If auth_to_local is enabled, we read the configuration hadoop.security.auth_to_local
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 492f5b0..f282246 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -1496,9 +1496,14 @@ public class Frontend {
       TQueryOptions queryOptions) {
     if (queryOptions.isSetMax_mem_estimate_for_admission()
         && queryOptions.getMax_mem_estimate_for_admission() > 0) {
-      long effectiveMemEstimate = Math.min(queryExecRequest.getPer_host_mem_estimate(),
+      long effectivePerHostMemEstimate = Math.min(
+          queryExecRequest.getPer_host_mem_estimate(),
               queryOptions.getMax_mem_estimate_for_admission());
-      queryExecRequest.setPer_host_mem_estimate(effectiveMemEstimate);
+      queryExecRequest.setPer_host_mem_estimate(effectivePerHostMemEstimate);
+      long effectiveCoordinatorMemEstimate = Math.min(
+          queryExecRequest.getDedicated_coord_mem_estimate(),
+              queryOptions.getMax_mem_estimate_for_admission());
+      queryExecRequest.setDedicated_coord_mem_estimate(effectiveCoordinatorMemEstimate);
     }
   }
 
diff --git a/testdata/workloads/functional-query/queries/QueryTest/dedicated-coord-mem-estimates.test b/testdata/workloads/functional-query/queries/QueryTest/dedicated-coord-mem-estimates.test
new file mode 100644
index 0000000..cd53dac
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/dedicated-coord-mem-estimates.test
@@ -0,0 +1,96 @@
+====
+---- QUERY
+# CTAS
+create table test as select id from functional.alltypes where id > 1
+---- RUNTIME_PROFILE
+row_regex: .*Per-Host Resource Estimates: Memory=16MB.*
+row_regex: .*Dedicated Coordinator Resource Estimate: Memory=116MB.*
+row_regex: .*Cluster Memory Admitted: 32.00 MB.*
+====
+---- QUERY
+# Truncate table to run the following inserts.
+truncate table test
+====
+---- QUERY
+# Small insert(i.e. values list, runs on coordinator only).
+insert into test values (1), (2), (3)
+---- RUNTIME_PROFILE
+row_regex: .*Per-Host Resource Estimates: Memory=10MB.*
+row_regex: .*Dedicated Coordinator Resource Estimate: Memory=100MB.*
+row_regex: .*Cluster Memory Admitted: 10.00 MB.*
+====
+---- QUERY
+# Large insert where it doesn't run on the coordinator.
+insert into test select id from functional.alltypes where id > 3
+---- RUNTIME_PROFILE
+row_regex: .*Per-Host Resource Estimates: Memory=16MB.*
+row_regex: .*Dedicated Coordinator Resource Estimate: Memory=116MB.*
+row_regex: .*Cluster Memory Admitted: 32.00 MB.*
+====
+---- QUERY
+# SELECT with merging exchange (i.e. order by).
+select * from functional.alltypes order by int_col;
+---- RUNTIME_PROFILE
+row_regex: .*Per-Host Resource Estimates: Memory=28MB.*
+row_regex: .*Dedicated Coordinator Resource Estimate: Memory=100MB.*
+row_regex: .*Cluster Memory Admitted: 157.47 MB.*
+====
+---- QUERY
+# SELECT with non-merging exchange.
+select * from functional.alltypes;
+---- RUNTIME_PROFILE
+row_regex: .*Per-Host Resource Estimates: Memory=16MB.*
+row_regex: .*Dedicated Coordinator Resource Estimate: Memory=100MB.*
+row_regex: .*Cluster Memory Admitted: 133.47 MB.*
+====
+---- QUERY
+# SELECT with a non-grouping aggregate in the coordinator fragment.
+select avg(int_col) from functional.alltypes;
+---- RUNTIME_PROFILE
+row_regex: .*Per-Host Resource Estimates: Memory=36MB.*
+row_regex: .*Dedicated Coordinator Resource Estimate: Memory=110MB.*
+row_regex: .*Cluster Memory Admitted: 182.05 MB.*
+====
+---- QUERY
+# SELECT with num_nodes=1 and a complex plan in the coordinator.
+set num_nodes=1;
+select
+  l_returnflag,
+  l_linestatus,
+  sum(l_quantity) as sum_qty,
+  sum(l_extendedprice) as sum_base_price,
+  sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
+  sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
+  avg(l_quantity) as avg_qty,
+  avg(l_extendedprice) as avg_price,
+  avg(l_discount) as avg_disc,
+  count(*) as count_order
+from
+  tpch.lineitem
+where
+  l_shipdate <= '1998-09-02'
+group by
+  l_returnflag,
+  l_linestatus
+order by
+  l_returnflag,
+  l_linestatus
+---- RUNTIME_PROFILE
+row_regex: .*Per-Host Resource Estimates: Memory=98MB.*
+row_regex: .*Dedicated Coordinator Resource Estimate: Memory=198MB.*
+row_regex: .*Cluster Memory Admitted: 198.00 MB.*
+====
+---- QUERY
+# SELECT with multiple unpartitioned analytic functions to force the sort and analytics
+# into the coordinator fragment.
+select id,
+min(int_col) over (order by year),
+min(int_col) over (order by bigint_col),
+avg(smallint_col) over (order by int_col),
+max(int_col) over (order by smallint_col rows between unbounded preceding and 1 following)
+from functional.alltypes;
+---- RUNTIME_PROFILE
+row_regex: .*Per-Host Resource Estimates: Memory=46MB.*
+row_regex: .*Dedicated Coordinator Resource Estimate: Memory=124MB.*
+row_regex: .*Cluster Memory Admitted: 216.00 MB.*
+====
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index b416270..c85220c 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -47,6 +47,7 @@ from tests.common.test_dimensions import (
     create_uncompressed_text_dimension)
 from tests.common.test_vector import ImpalaTestDimension
 from tests.hs2.hs2_test_suite import HS2TestSuite, needs_session
+from tests.verifiers.mem_usage_verifier import MemUsageVerifier
 from ImpalaService import ImpalaHiveServer2Service
 from TCLIService import TCLIService
 
@@ -473,9 +474,170 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
       exec_options['mem_limit'] = long(self.PROC_MEM_TEST_LIMIT * 1.1)
       ex = self.execute_query_expect_failure(self.client, query, exec_options)
       assert ("Rejected query from pool default-pool: request memory needed "
-              "1.10 GB per node is greater than memory available for admission 1.00 GB" in
+              "1.10 GB is greater than memory available for admission 1.00 GB" in
               str(ex)), str(ex)
 
+  @SkipIfNotHdfsMinicluster.tuned_for_minicluster
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args=impalad_admission_ctrl_config_args(
+      fs_allocation_file="mem-limit-test-fair-scheduler.xml",
+      llama_site_file="mem-limit-test-llama-site.xml"), num_exclusive_coordinators=1,
+    cluster_size=2)
+  def test_dedicated_coordinator_mem_accounting(self, vector):
+    """Verify that when using dedicated coordinators, the memory admitted for and the
+    mem limit applied to the query fragments running on the coordinator is different than
+    the ones on executors."""
+    self.__verify_mem_accounting(vector, using_dedicated_coord_estimates=True)
+
+  @SkipIfNotHdfsMinicluster.tuned_for_minicluster
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args=impalad_admission_ctrl_config_args(
+      fs_allocation_file="mem-limit-test-fair-scheduler.xml",
+      llama_site_file="mem-limit-test-llama-site.xml")
+    + " -use_dedicated_coordinator_estimates false",
+    num_exclusive_coordinators=1,
+    cluster_size=2)
+  def test_dedicated_coordinator_legacy_mem_accounting(self, vector):
+    """Verify that when using dedicated coordinators with specialized dedicated coord
+    estimates turned off using a hidden startup param, the memory admitted for and the
+    mem limit applied to the query fragments running on the coordinator is the same
+    (as expected from legacy behavior)."""
+    self.__verify_mem_accounting(vector, using_dedicated_coord_estimates=False)
+
+  @SkipIfNotHdfsMinicluster.tuned_for_minicluster
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args=impalad_admission_ctrl_config_args(
+      fs_allocation_file="mem-limit-test-fair-scheduler.xml",
+      llama_site_file="mem-limit-test-llama-site.xml"), num_exclusive_coordinators=1,
+    cluster_size=2)
+  def test_sanity_checks_dedicated_coordinator(self, vector):
+    """Test for verifying targeted dedicated coordinator memory estimation behavior."""
+    self.client.set_configuration_option('request_pool', "root.regularPool")
+    ImpalaTestSuite.change_database(self.client, vector.get_value('table_format'))
+    exec_options = vector.get_value('exec_option')
+    # Make sure query option MAX_MEM_ESTIMATE_FOR_ADMISSION is enforced on the dedicated
+    # coord estimates. Without this query option the estimate would be > 100MB.
+    expected_mem = 60 * (1 << 20)  # 60MB
+    exec_options['MAX_MEM_ESTIMATE_FOR_ADMISSION'] = expected_mem
+    self.client.set_configuration(exec_options)
+    handle = self.client.execute_async(QUERY.format(1))
+    self.client.wait_for_finished_timeout(handle, 1000)
+    mem_to_admit = self.__get_mem_limits_admission_debug_page()
+    assert abs(mem_to_admit['coordinator'] - expected_mem) < 0.0001,\
+      "mem_to_admit:" + str(mem_to_admit)
+    assert abs(mem_to_admit['executor'] - expected_mem) < 0.0001, \
+      "mem_to_admit:" + str(mem_to_admit)
+    self.client.close_query(handle)
+
+    # If the query is only scheduled on the coordinator then the mem to admit on executor
+    # should be zero.
+    exec_options['NUM_NODES'] = 1
+    self.client.set_configuration(exec_options)
+    handle = self.client.execute_async(QUERY.format(1))
+    self.client.wait_for_finished_timeout(handle, 1000)
+    mem_to_admit = self.__get_mem_limits_admission_debug_page()
+    assert abs(mem_to_admit['coordinator'] - expected_mem) < 0.0001, \
+      "mem_to_admit:" + str(mem_to_admit)
+    assert abs(mem_to_admit['executor'] - 0) < 0.0001, \
+      "mem_to_admit:" + str(mem_to_admit)
+    self.client.close_query(handle)
+
+  def __verify_mem_accounting(self, vector, using_dedicated_coord_estimates):
+    """Helper method used by test_dedicated_coordinator_*_mem_accounting that verifies
+    the actual vs expected values for mem admitted and mem limit for both coord and
+    executor. Also verifies that those memory values are different if
+    'using_dedicated_coord_estimates' is true."""
+    self.client.set_configuration_option('request_pool', "root.regularPool")
+    ImpalaTestSuite.change_database(self.client, vector.get_value('table_format'))
+    handle = self.client.execute_async(QUERY.format(1))
+    self.client.wait_for_finished_timeout(handle, 1000)
+    expected_mem_limits = self.__get_mem_limits_admission_debug_page()
+    actual_mem_limits = self.__get_mem_limits_memz_debug_page(handle.get_handle().id)
+    mem_admitted = self.__get_mem_admitted_backends_debug_page()
+    debug_string = " expected_mem_limits:" + str(
+      expected_mem_limits) + " actual_mem_limits:" + str(
+      actual_mem_limits) + " mem_admitted:" + str(mem_admitted)
+    MB = 1 << 20
+    # Easiest way to check float in-equality.
+    assert abs(expected_mem_limits['coordinator'] - expected_mem_limits[
+      'executor']) > 0.0001 or not using_dedicated_coord_estimates, debug_string
+    # There may be some rounding errors so keep a margin of 5MB when verifying
+    assert abs(actual_mem_limits['coordinator'] - expected_mem_limits[
+      'coordinator']) < 5 * MB, debug_string
+    assert abs(actual_mem_limits['executor'] - expected_mem_limits[
+      'executor']) < 5 * MB, debug_string
+    assert abs(mem_admitted['coordinator'] - expected_mem_limits[
+      'coordinator']) < 5 * MB, debug_string
+    assert abs(
+      mem_admitted['executor'] - expected_mem_limits['executor']) < 5 * MB, debug_string
+
+  def __get_mem_limits_admission_debug_page(self):
+    """Helper method assumes a 2 node cluster using a dedicated coordinator. Returns the
+    mem_limit calculated by the admission controller from the impala admission debug page
+    of the coordinator impala daemon. Returns a dictionary with the keys 'coordinator'
+    and 'executor' and their respective mem values in bytes."""
+    # Based on how the cluster is setup, the first impalad in the cluster is the
+    # coordinator.
+    response_json = self.cluster.impalads[0].service.get_debug_webpage_json("admission")
+    assert 'resource_pools' in response_json
+    assert len(response_json['resource_pools']) == 1
+    assert response_json['resource_pools'][0]['running_queries']
+    assert len(response_json['resource_pools'][0]['running_queries']) == 1
+    query_info = response_json['resource_pools'][0]['running_queries'][0]
+    return {'coordinator': float(query_info["coord_mem_to_admit"]),
+            'executor': float(query_info["mem_limit"])}
+
+  def __get_mem_limits_memz_debug_page(self, query_id):
+    """Helper method assumes a 2 node cluster using a dedicated coordinator. Returns the
+    mem limits enforced on the query (identified by the 'query_id') extracted from
+    mem-tracker's output on the memz debug page of the dedicated coordinator and the
+    executor impala daemons. Returns a dictionary with the keys 'coordinator' and
+    'executor' and their respective mem values in bytes."""
+    metric_name = "Query({0})".format(query_id)
+    # Based on how the cluster is setup, the first impalad in the cluster is the
+    # coordinator.
+    mem_trackers = [MemUsageVerifier(i.service).get_mem_usage_values(metric_name) for i in
+                    self.cluster.impalads]
+    return {'coordinator': float(mem_trackers[0]['limit']),
+            'executor': float(mem_trackers[1]['limit'])}
+
+  def __get_mem_admitted_backends_debug_page(self):
+    """Helper method assumes a 2 node cluster using a dedicated coordinator. Returns the
+    mem admitted to the backends extracted from the backends debug page of the coordinator
+    impala daemon. Returns a dictionary with the keys 'coordinator' and 'executor' and
+    their respective mem values in bytes."""
+    # Based on how the cluster is setup, the first impalad in the cluster is the
+    # coordinator.
+    response_json = self.cluster.impalads[0].service.get_debug_webpage_json("backends")
+    assert 'backends' in response_json
+    assert len(response_json['backends']) == 2
+    ret = dict()
+    from tests.verifiers.mem_usage_verifier import parse_mem_value
+    for backend in response_json['backends']:
+      if backend['is_coordinator']:
+        ret['coordinator'] = parse_mem_value(backend['mem_admitted'])
+      else:
+        ret['executor'] = parse_mem_value(backend['mem_admitted'])
+    return ret
+
+  @SkipIfNotHdfsMinicluster.tuned_for_minicluster
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(num_exclusive_coordinators=1)
+  def test_dedicated_coordinator_planner_estimates(self, vector, unique_database):
+    """Planner tests to add coverage for coordinator estimates when using dedicated
+    coordinators. Also includes coverage for verifying cluster memory admitted."""
+    vector_copy = copy(vector)
+    exec_options = vector.get_value('exec_option')
+    # Remove num_nodes from the options to allow test case runner to set it in one of
+    # the test cases.
+    del exec_options['num_nodes']
+    exec_options['num_scanner_threads'] = 1  # To make estimates consistently reproducible
+    self.run_test_case('QueryTest/dedicated-coord-mem-estimates', vector_copy,
+                       unique_database)
+
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       impalad_args=impalad_admission_ctrl_flags(max_requests=2, max_queued=1,
@@ -510,8 +672,8 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
       exec_options['mem_limit'] = "3G"
       self.execute_query(query, exec_options)
     except ImpalaBeeswaxException as e:
-      assert re.search("Rejected query from pool \S+: request memory needed 3.00 GB per "
-          "node is greater than memory available for admission 2.00 GB of \S+", str(e)), \
+      assert re.search("Rejected query from pool \S+: request memory needed 3.00 GB"
+          " is greater than memory available for admission 2.00 GB of \S+", str(e)), \
           str(e)
     # Exercise queuing checks in admission controller.
     try:
diff --git a/www/admission_controller.tmpl b/www/admission_controller.tmpl
index 3af73ef..069f57c 100644
--- a/www/admission_controller.tmpl
+++ b/www/admission_controller.tmpl
@@ -270,8 +270,10 @@ Time since last statestore update containing admission control topic state (ms):
   <table class='table table-hover table-border'>
     <tr>
       <th>Query ID</th>
-      <th>Memory limit</th>
-      <th>Memory limit used for admission</th>
+      <th>Memory limit for the executors</th>
+      <th>Memory admitted on the executors</th>
+      <th>Memory limit for the coordinator</th>
+      <th>Memory admitted on the coordinator</th>
       <th>Num of backends it will run on</th>
       <th>Details</th>
     </tr>
@@ -280,6 +282,8 @@ Time since last statestore update containing admission control topic state (ms):
       <td>{{query_id}}</td>
       <td class='memory'>{{mem_limit}}</td>
       <td class='memory'>{{mem_limit_to_admit}}</td>
+      <td class='memory'>{{coord_mem_limit}}</td>
+      <td class='memory'>{{coord_mem_to_admit}}</td>
       <td>{{num_backends}}</td>
       <td><a href='/query_plan?query_id={{query_id}}'>Details</a></td>
     </tr>
@@ -290,8 +294,10 @@ Time since last statestore update containing admission control topic state (ms):
   <table class='table table-hover table-border'>
     <tr>
       <th>Query ID</th>
-      <th>Memory limit</th>
-      <th>Memory limit used for admission</th>
+      <th>Memory limit for the executors</th>
+      <th>Memory admitted on the executors</th>
+      <th>Memory limit for the coordinator</th>
+      <th>Memory admitted on the coordinator</th>
       <th>Num of backends it will run on</th>
       <th>Details</th>
     </tr>
@@ -300,6 +306,8 @@ Time since last statestore update containing admission control topic state (ms):
       <td>{{query_id}}</td>
       <td class='memory'>{{mem_limit}}</td>
       <td class='memory'>{{mem_limit_to_admit}}</td>
+      <td class='memory'>{{coord_mem_limit}}</td>
+      <td class='memory'>{{coord_mem_to_admit}}</td>
       <td>{{num_backends}}</td>
       <td><a href='/query_plan?query_id={{query_id}}'>Details</a></td>
     </tr>


[impala] 01/05: IMPALA-8779, IMPALA-8780: RowBatchQueue re-factoring and BufferedPRS impl

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stakiar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 699450aadbf45f36617472b7c777dc2d9aad066a
Author: Sahil Takiar <ta...@gmail.com>
AuthorDate: Wed Jul 17 10:53:25 2019 -0700

    IMPALA-8779, IMPALA-8780: RowBatchQueue re-factoring and BufferedPRS impl
    
    Improves the encapsulation of RowBatchQueue by the doing the following
    re-factoring:
    * Renames RowBatchQueue to BlockingRowBatchQueue which is more
    indicitive of what the queue does
    * Re-factors the timers managed by the scan-node into the
    BlockingRowBatchQueue implementation
    * Favors composition over inheritance by re-factoring
    BlockingRowBatchQueue to own a BlockingQueue rather than extending one
    
    The re-factoring lays the groundwork for introducing a generic
    RowBatchQueue that all RowBatch queues inherit from.
    
    Adds a new DequeRowBatchQueue which is a simple wrapper around a
    std::deque that (1) stores unique_ptr to queued RowBatch-es and (2)
    has a maximum capacity.
    
    Implements BufferedPlanRootSink using the new DequeRowBatchQueue.
    DequeRowBatchQueue is generic enough that replacing it with a
    SpillableQueue (queue backed by a BufferedTupleStream) should be
    straightforward. BufferedPlanRootSink is synchronized to protect access
    to DequeRowBatchQueue since the queue is not thread safe.
    
    BufferedPlanRootSink FlushFinal blocks until the consumer thread has
    processed all RowBatches. This ensures that the coordinator fragment
    stays alive until all results are fetched, but allows all other
    fragments to be shutdown immediately.
    
    Testing:
    * Running core tests
    * Updated tests/query_test/test_result_spooling.py
    
    Change-Id: I9b1bb4b9c6f6e92c70e8fbee6ccdf48c2f85b7be
    Reviewed-on: http://gerrit.cloudera.org:8080/13883
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/blocking-plan-root-sink.cc             |  15 +--
 be/src/exec/blocking-plan-root-sink.h              |   3 -
 be/src/exec/buffered-plan-root-sink.cc             | 126 ++++++++++++++++++++-
 be/src/exec/buffered-plan-root-sink.h              |  53 ++++++++-
 be/src/exec/data-sink.cc                           |   1 +
 be/src/exec/hdfs-scan-node.cc                      |   4 +-
 be/src/exec/kudu-scan-node.cc                      |   4 +-
 be/src/exec/plan-root-sink.cc                      |  15 +++
 be/src/exec/plan-root-sink.h                       |  10 ++
 be/src/exec/scan-node.cc                           |  10 +-
 be/src/exec/scan-node.h                            |   6 +-
 be/src/exec/scanner-context.cc                     |   1 -
 be/src/runtime/CMakeLists.txt                      |   3 +-
 ...-batch-queue.cc => blocking-row-batch-queue.cc} |  39 +++++--
 be/src/runtime/blocking-row-batch-queue.h          | 114 +++++++++++++++++++
 be/src/runtime/deque-row-batch-queue.cc            |  66 +++++++++++
 be/src/runtime/deque-row-batch-queue.h             |  70 ++++++++++++
 be/src/runtime/row-batch-queue.h                   |  80 -------------
 be/src/util/blocking-queue.h                       |  47 ++++----
 tests/query_test/test_result_spooling.py           |   7 +-
 20 files changed, 514 insertions(+), 160 deletions(-)

diff --git a/be/src/exec/blocking-plan-root-sink.cc b/be/src/exec/blocking-plan-root-sink.cc
index 8e65176..05ebd30 100644
--- a/be/src/exec/blocking-plan-root-sink.cc
+++ b/be/src/exec/blocking-plan-root-sink.cc
@@ -38,21 +38,10 @@ BlockingPlanRootSink::BlockingPlanRootSink(
 
 Status BlockingPlanRootSink::Send(RuntimeState* state, RowBatch* batch) {
   SCOPED_TIMER(profile()->total_time_counter());
-  ValidateCollectionSlots(*row_desc_, batch);
+  PlanRootSink::ValidateCollectionSlots(*row_desc_, batch);
+  RETURN_IF_ERROR(PlanRootSink::UpdateAndCheckRowsProducedLimit(state, batch));
   int current_batch_row = 0;
 
-  // Check to ensure that the number of rows produced by query execution does not exceed
-  // rows_returned_limit_. Since the BlockingPlanRootSink has a single producer, the
-  // num_rows_returned_ value can be verified without acquiring the lock_.
-  num_rows_produced_ += batch->num_rows();
-  if (num_rows_produced_limit_ > 0 && num_rows_produced_ > num_rows_produced_limit_) {
-    Status err = Status::Expected(TErrorCode::ROWS_PRODUCED_LIMIT_EXCEEDED,
-        PrintId(state->query_id()),
-        PrettyPrinter::Print(num_rows_produced_limit_, TUnit::NONE));
-    VLOG_QUERY << err.msg().msg();
-    return err;
-  }
-
   // Don't enter the loop if batch->num_rows() == 0; no point triggering the consumer with
   // 0 rows to return. Be wary of ever returning 0-row batches to the client; some poorly
   // written clients may not cope correctly with them. See IMPALA-4335.
diff --git a/be/src/exec/blocking-plan-root-sink.h b/be/src/exec/blocking-plan-root-sink.h
index 71af942..cb95da8 100644
--- a/be/src/exec/blocking-plan-root-sink.h
+++ b/be/src/exec/blocking-plan-root-sink.h
@@ -90,8 +90,5 @@ class BlockingPlanRootSink : public PlanRootSink {
 
   /// Set by GetNext() to indicate to Send() how many rows it should write to results_.
   int num_rows_requested_ = 0;
-
-  /// Updated by Send() to indicate the total number of rows produced by query execution.
-  int64_t num_rows_produced_ = 0;
 };
 }
diff --git a/be/src/exec/buffered-plan-root-sink.cc b/be/src/exec/buffered-plan-root-sink.cc
index a7f2467..4ba2f07 100644
--- a/be/src/exec/buffered-plan-root-sink.cc
+++ b/be/src/exec/buffered-plan-root-sink.cc
@@ -16,30 +16,148 @@
 // under the License.
 
 #include "exec/buffered-plan-root-sink.h"
+#include "runtime/deque-row-batch-queue.h"
+#include "service/query-result-set.h"
+
+#include "common/names.h"
 
 namespace impala {
 
+// The maximum number of row batches to queue before calls to Send() start to block.
+// After this many row batches have been added, Send() will block until GetNext() reads
+// RowBatches from the queue.
+const uint32_t MAX_QUEUED_ROW_BATCHES = 10;
+
 BufferedPlanRootSink::BufferedPlanRootSink(
     TDataSinkId sink_id, const RowDescriptor* row_desc, RuntimeState* state)
-  : PlanRootSink(sink_id, row_desc, state) {}
+  : PlanRootSink(sink_id, row_desc, state),
+    batch_queue_(new DequeRowBatchQueue(MAX_QUEUED_ROW_BATCHES)) {}
 
 Status BufferedPlanRootSink::Send(RuntimeState* state, RowBatch* batch) {
+  SCOPED_TIMER(profile()->total_time_counter());
+  // If the batch is empty, we have nothing to do so just return Status::OK().
+  if (batch->num_rows() == 0) return Status::OK();
+
+  // Close should only be called by the producer thread, no RowBatches should be sent
+  // after the sink is closed.
+  DCHECK(!closed_);
+  DCHECK(batch_queue_->IsOpen());
+  PlanRootSink::ValidateCollectionSlots(*row_desc_, batch);
+  RETURN_IF_ERROR(PlanRootSink::UpdateAndCheckRowsProducedLimit(state, batch));
+
+  // Make a copy of the given RowBatch and place it on the queue.
+  unique_ptr<RowBatch> output_batch =
+      make_unique<RowBatch>(batch->row_desc(), batch->capacity(), mem_tracker());
+  batch->DeepCopyTo(output_batch.get());
+
+  {
+    // Add the copied batch to the RowBatch queue and wake up the consumer thread if it is
+    // waiting for rows to process.
+    unique_lock<mutex> l(lock_);
+
+    // If the queue is full, wait for the producer thread to read batches from it.
+    while (!state->is_cancelled() && batch_queue_->IsFull()) {
+      batch_queue_has_capacity_.Wait(l);
+    }
+    RETURN_IF_CANCELLED(state);
+
+    // Add the batch to the queue and then notify the consumer that rows are available.
+    if (!batch_queue_->AddBatch(move(output_batch))) {
+      // Adding a batch should always be successful because the queue should always be
+      // open when Send is called, and the call to batch_queue_has_capacity_.Wait(l)
+      // ensures space is available.
+      DCHECK(false) << "DequeueRowBatchQueue::AddBatch should never return false";
+    }
+  }
+  // Release the lock before calling notify so the consumer thread can immediately acquire
+  // the lock.
+  rows_available_.NotifyOne();
   return Status::OK();
 }
 
 Status BufferedPlanRootSink::FlushFinal(RuntimeState* state) {
+  SCOPED_TIMER(profile()->total_time_counter());
+  DCHECK(!closed_);
+  unique_lock<mutex> l(lock_);
+  sender_state_ = SenderState::EOS;
+  // If no batches are ever added, wake up the consumer thread so it can check the
+  // SenderState and return appropriately.
+  rows_available_.NotifyAll();
+  // Wait until the consumer has read all rows from the batch_queue_.
+  consumer_eos_.Wait(l);
+  RETURN_IF_CANCELLED(state);
   return Status::OK();
 }
 
 void BufferedPlanRootSink::Close(RuntimeState* state) {
+  SCOPED_TIMER(profile()->total_time_counter());
+  unique_lock<mutex> l(lock_);
+  // FlushFinal() won't have been called when the fragment instance encounters an error
+  // before sending all rows.
+  if (sender_state_ == SenderState::ROWS_PENDING) {
+    sender_state_ = SenderState::CLOSED_NOT_EOS;
+  }
+  batch_queue_->Close();
+  // While it should be safe to call NotifyOne() here, prefer to use NotifyAll() to
+  // ensure that all sleeping threads are awoken. The call to NotifyAll() is not on the
+  // fast path so any overhead from calling it should be negligible.
+  rows_available_.NotifyAll();
   DataSink::Close(state);
 }
 
-void BufferedPlanRootSink::Cancel(RuntimeState* state) {}
+void BufferedPlanRootSink::Cancel(RuntimeState* state) {
+  DCHECK(state->is_cancelled());
+  // Wake up all sleeping threads so they can check the cancellation state.
+  // While it should be safe to call NotifyOne() here, prefer to use NotifyAll() to
+  // ensure that all sleeping threads are awoken. The calls to NotifyAll() are not on the
+  // fast path so any overhead from calling it should be negligible.
+  rows_available_.NotifyAll();
+  consumer_eos_.NotifyAll();
+  batch_queue_has_capacity_.NotifyAll();
+}
 
 Status BufferedPlanRootSink::GetNext(
     RuntimeState* state, QueryResultSet* results, int num_results, bool* eos) {
-  *eos = true;
-  return Status::OK();
+  {
+    unique_lock<mutex> l(lock_);
+    while (batch_queue_->IsEmpty() && sender_state_ == SenderState::ROWS_PENDING
+        && !state->is_cancelled()) {
+      rows_available_.Wait(l);
+    }
+
+    // If the query was cancelled while the sink was waiting for rows to become available,
+    // or if the query was cancelled before the current call to GetNext, set eos and then
+    // return. The queue could be empty if the sink was closed while waiting for rows to
+    // become available, or if the sink was closed before the current call to GetNext.
+    if (!state->is_cancelled() && !batch_queue_->IsEmpty()) {
+      unique_ptr<RowBatch> batch = batch_queue_->GetBatch();
+      // TODO for now, if num_results < batch->num_rows(), we terminate returning results
+      // early until we can properly handle fetch requests where
+      // num_results < batch->num_rows().
+      if (num_results > 0 && num_results < batch->num_rows()) {
+        *eos = true;
+        batch_queue_has_capacity_.NotifyOne();
+        consumer_eos_.NotifyOne();
+        return Status::Expected(TErrorCode::NOT_IMPLEMENTED_ERROR,
+            "BufferedPlanRootSink does not support setting num_results < BATCH_SIZE");
+      }
+      RETURN_IF_ERROR(
+          results->AddRows(output_expr_evals_, batch.get(), 0, batch->num_rows()));
+      batch->Reset();
+    }
+    *eos = batch_queue_->IsEmpty() && sender_state_ == SenderState::EOS;
+    if (*eos) consumer_eos_.NotifyOne();
+  }
+  // Release the lock before calling notify so the consumer thread can immediately
+  // acquire the lock. It is safe to call notify batch_queue_has_capacity_ regardless of
+  // whether a RowBatch is read. Either (1) a RowBatch is read and the queue is no longer
+  // full, so notify the consumer thread or (2) a Rowbatch was not read, which means
+  // either FlushFinal was called or the query was cancelled. If FlushFinal was called
+  // then the consumer thread has completed. If the query is cancelled, then we wake up
+  // the consumer thread so it can check the cancellation status and return. Releasing
+  // the lock is safe because the consumer always loops until the queue is actually has
+  // space.
+  batch_queue_has_capacity_.NotifyOne();
+  return state->GetQueryStatus();
 }
 }
diff --git a/be/src/exec/buffered-plan-root-sink.h b/be/src/exec/buffered-plan-root-sink.h
index f875988..939c2a0 100644
--- a/be/src/exec/buffered-plan-root-sink.h
+++ b/be/src/exec/buffered-plan-root-sink.h
@@ -18,27 +18,70 @@
 #pragma once
 
 #include "exec/plan-root-sink.h"
+#include "util/condition-variable.h"
 
 namespace impala {
 
+class DequeRowBatchQueue;
+
 /// PlanRootSink that buffers RowBatches from the 'sender' (fragment) thread. RowBatches
-/// are buffered in memory until a memory limit is hit. Any subsequent calls to Send will
-/// block until the 'consumer' (coordinator) thread has read enough RowBatches to free up
-/// sufficient memory.
+/// are buffered in memory until a max number of RowBatches are queued. Any subsequent
+/// calls to Send will block until the 'consumer' (coordinator) thread has read enough
+/// RowBatches to free up sufficient space in the queue. The blocking behavior follows
+/// the same semantics as BlockingPlanRootSink.
+///
+/// FlushFinal() blocks until the consumer has read all RowBatches from the queue or
+/// until the sink is either closed or cancelled. This ensures that the coordinator
+/// fragment stays alive until the client fetches all results, but allows all other
+/// fragments to complete and release their resources.
+///
+/// The sink assumes a non-thread safe RowBatchQueue is injected and uses a single lock to
+/// synchronize access to the queue.
 class BufferedPlanRootSink : public PlanRootSink {
  public:
-  BufferedPlanRootSink(
-      TDataSinkId sink_id, const RowDescriptor* row_desc, RuntimeState* state);
+  BufferedPlanRootSink(TDataSinkId sink_id, const RowDescriptor* row_desc,
+      RuntimeState* state);
 
+  /// Creates a copy of the given RowBatch and adds it to the queue. The copy is
+  /// necessary as the ownership of 'batch' remains with the sender.
   virtual Status Send(RuntimeState* state, RowBatch* batch) override;
 
+  /// Notifies the consumer of producer eos and blocks until the consumer has read all
+  /// batches from the queue, or until the sink is either closed or cancelled.
   virtual Status FlushFinal(RuntimeState* state) override;
 
+  /// Release resources and unblocks consumer.
   virtual void Close(RuntimeState* state) override;
 
+  /// Blocks until rows are available for consumption
   virtual Status GetNext(
       RuntimeState* state, QueryResultSet* result_set, int num_rows, bool* eos) override;
 
+  /// Notifies both consumer and producer threads so they can check the cancellation
+  /// status.
   virtual void Cancel(RuntimeState* state) override;
+
+ private:
+  /// Protects the RowBatchQueue and all ConditionVariables.
+  boost::mutex lock_;
+
+  /// Waited on by the consumer inside GetNext() until rows are available for consumption.
+  /// Signaled when the producer adds a RowBatch to the queue. Also signaled by
+  /// FlushFinal(), Close() and Cancel() to unblock the sender.
+  ConditionVariable rows_available_;
+
+  /// Waited on by the producer inside FlushFinal() until the consumer has hit eos.
+  /// Signaled when the consumer reads all RowBatches from the queue. Also signaled in
+  /// Cancel() to unblock the producer.
+  ConditionVariable consumer_eos_;
+
+  /// Waited on by the producer inside Send() if the RowBatchQueue is full. Signaled
+  /// when the consumer reads a batch from the RowBatchQueue. Also signaled in Cancel()
+  /// to unblock the producer.
+  ConditionVariable batch_queue_has_capacity_;
+
+  /// A DequeRowBatchQueue that buffers RowBatches from the sender for consumption by
+  /// the consumer. The queue is not thread safe and access is protected by 'lock_'.
+  std::unique_ptr<DequeRowBatchQueue> batch_queue_;
 };
 }
diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc
index f9bec90..e2d0ae9 100644
--- a/be/src/exec/data-sink.cc
+++ b/be/src/exec/data-sink.cc
@@ -33,6 +33,7 @@
 #include "gen-cpp/ImpalaInternalService_constants.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
 #include "gutil/strings/substitute.h"
+#include "runtime/deque-row-batch-queue.h"
 #include "runtime/krpc-data-stream-sender.h"
 #include "runtime/mem-tracker.h"
 #include "util/container-util.h"
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 2c64f97..f839bc4 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -25,12 +25,12 @@
 #include "exec/exec-node-util.h"
 #include "exec/hdfs-scanner.h"
 #include "exec/scanner-context.h"
+#include "runtime/blocking-row-batch-queue.h"
 #include "runtime/descriptors.h"
 #include "runtime/fragment-instance-state.h"
 #include "runtime/io/request-context.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/query-state.h"
-#include "runtime/row-batch-queue.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-filter.inline.h"
 #include "runtime/runtime-state.h"
@@ -305,7 +305,7 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourcePool* pool) {
 
     if (!first_thread) {
       // Cases 5, 6 and 7.
-      if (thread_state_.batch_queue()->AtCapacity()) break;
+      if (thread_state_.batch_queue()->IsFull()) break;
       if (!scanner_mem_limiter->ClaimMemoryForScannerThread(this, est_mem)) {
         COUNTER_ADD(thread_state_.scanner_thread_mem_unavailable_counter(), 1);
         break;
diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc
index 12b5365..fe08731 100644
--- a/be/src/exec/kudu-scan-node.cc
+++ b/be/src/exec/kudu-scan-node.cc
@@ -24,10 +24,10 @@
 #include "exec/kudu-util.h"
 #include "exprs/scalar-expr.h"
 #include "gutil/gscoped_ptr.h"
+#include "runtime/blocking-row-batch-queue.h"
 #include "runtime/fragment-instance-state.h"
 #include "runtime/mem-pool.h"
 #include "runtime/query-state.h"
-#include "runtime/row-batch-queue.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "runtime/scanner-mem-limiter.h"
@@ -149,7 +149,7 @@ void KuduScanNode::ThreadAvailableCb(ThreadResourcePool* pool) {
     // * Don't start up a thread if there is not enough memory available for the
     //    estimated memory consumption (include reservation and non-reserved memory).
     if (!first_thread) {
-      if (thread_state_.batch_queue()->AtCapacity()) break;
+      if (thread_state_.batch_queue()->IsFull()) break;
       if (!mem_limiter->ClaimMemoryForScannerThread(
               this, EstimateScannerThreadMemConsumption())) {
         COUNTER_ADD(thread_state_.scanner_thread_mem_unavailable_counter(), 1);
diff --git a/be/src/exec/plan-root-sink.cc b/be/src/exec/plan-root-sink.cc
index 6f9015e..43afdd0 100644
--- a/be/src/exec/plan-root-sink.cc
+++ b/be/src/exec/plan-root-sink.cc
@@ -60,4 +60,19 @@ void PlanRootSink::ValidateCollectionSlots(
   }
 #endif
 }
+
+Status PlanRootSink::UpdateAndCheckRowsProducedLimit(
+    RuntimeState* state, RowBatch* batch) {
+  // Since the PlanRootSink has a single producer, the
+  // num_rows_returned_ value can be verified without acquiring any locks.
+  num_rows_produced_ += batch->num_rows();
+  if (num_rows_produced_limit_ > 0 && num_rows_produced_ > num_rows_produced_limit_) {
+    Status err = Status::Expected(TErrorCode::ROWS_PRODUCED_LIMIT_EXCEEDED,
+        PrintId(state->query_id()),
+        PrettyPrinter::Print(num_rows_produced_limit_, TUnit::NONE));
+    VLOG_QUERY << err.msg().msg();
+    return err;
+  }
+  return Status::OK();
+}
 }
diff --git a/be/src/exec/plan-root-sink.h b/be/src/exec/plan-root-sink.h
index 7a40b8c..ae5651f 100644
--- a/be/src/exec/plan-root-sink.h
+++ b/be/src/exec/plan-root-sink.h
@@ -88,6 +88,11 @@ class PlanRootSink : public DataSink {
   /// SubplanNode with respect to setting collection-slots to NULL.
   void ValidateCollectionSlots(const RowDescriptor& row_desc, RowBatch* batch);
 
+  /// Check to ensure that the number of rows produced by query execution does not exceed
+  /// the NUM_ROWS_PRODUCED_LIMIT query option. Returns an error Status if the given
+  /// batch causes the limit to be exceeded. Updates the value of num_rows_produced_.
+  Status UpdateAndCheckRowsProducedLimit(RuntimeState* state, RowBatch* batch);
+
   /// State of the sender:
   /// - ROWS_PENDING: the sender is still producing rows; the only non-terminal state
   /// - EOS: the sender has passed all rows to Send()
@@ -96,8 +101,13 @@ class PlanRootSink : public DataSink {
   enum class SenderState { ROWS_PENDING, EOS, CLOSED_NOT_EOS };
   SenderState sender_state_ = SenderState::ROWS_PENDING;
 
+ private:
   /// Limit on the number of rows produced by this query, initialized by the constructor.
   const int64_t num_rows_produced_limit_;
+
+  /// Updated by CheckRowsProducedLimit() to indicate the total number of rows produced
+  /// by query execution.
+  int64_t num_rows_produced_ = 0;
 };
 }
 
diff --git a/be/src/exec/scan-node.cc b/be/src/exec/scan-node.cc
index ab05432..710d1ea 100644
--- a/be/src/exec/scan-node.cc
+++ b/be/src/exec/scan-node.cc
@@ -21,9 +21,9 @@
 #include <boost/bind.hpp>
 
 #include "exprs/scalar-expr.h"
+#include "runtime/blocking-row-batch-queue.h"
 #include "runtime/io/disk-io-mgr.h"
 #include "runtime/query-state.h"
-#include "runtime/row-batch-queue.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-filter.inline.h"
 #include "runtime/runtime-state.h"
@@ -259,8 +259,8 @@ void ScanNode::ScannerThreadState::Open(
   VLOG(2) << "Max row batch queue size for scan node '" << parent->id()
           << "' in fragment instance '" << PrintId(state->fragment_instance_id())
           << "': " << max_row_batches;
-  batch_queue_.reset(
-      new RowBatchQueue(max_row_batches, FLAGS_max_queued_row_batch_bytes));
+  batch_queue_.reset(new BlockingRowBatchQueue(max_row_batches,
+      FLAGS_max_queued_row_batch_bytes, row_batches_get_timer_, row_batches_put_timer_));
 
   // Start measuring the scanner thread concurrency only once the node is opened.
   average_concurrency_ = parent->runtime_profile()->AddSamplingCounter(
@@ -303,7 +303,7 @@ bool ScanNode::ScannerThreadState::EnqueueBatchWithTimeout(
   // Transfer memory ownership before enqueueing. If the caller retries, this transfer
   // is idempotent.
   (*row_batch)->SetMemTracker(row_batches_mem_tracker_);
-  if (!batch_queue_->BlockingPutWithTimeout(move(*row_batch), timeout_micros)) {
+  if (!batch_queue_->AddBatchWithTimeout(move(*row_batch), timeout_micros)) {
     return false;
   }
   COUNTER_ADD(row_batches_enqueued_, 1);
@@ -319,8 +319,6 @@ void ScanNode::ScannerThreadState::Close(ScanNode* parent) {
   scanner_threads_.JoinAll();
   DCHECK_EQ(num_active_.Load(), 0) << "There should be no active threads";
   if (batch_queue_ != nullptr) {
-    row_batches_put_timer_->Set(batch_queue_->total_put_wait_time());
-    row_batches_get_timer_->Set(batch_queue_->total_get_wait_time());
     row_batches_peak_mem_consumption_->Set(row_batches_mem_tracker_->peak_consumption());
     batch_queue_->Cleanup();
   }
diff --git a/be/src/exec/scan-node.h b/be/src/exec/scan-node.h
index 68a3fad..7c3a800 100644
--- a/be/src/exec/scan-node.h
+++ b/be/src/exec/scan-node.h
@@ -26,7 +26,7 @@
 
 namespace impala {
 
-class RowBatchQueue;
+class BlockingRowBatchQueue;
 class TScanRange;
 
 /// Abstract base class of all scan nodes. Subclasses support different storage layers
@@ -253,7 +253,7 @@ class ScanNode : public ExecNode {
     bool EnqueueBatchWithTimeout(std::unique_ptr<RowBatch>* row_batch,
         int64_t timeout_micros);
 
-    RowBatchQueue* batch_queue() { return batch_queue_.get(); }
+    BlockingRowBatchQueue* batch_queue() { return batch_queue_.get(); }
     RuntimeProfile::ThreadCounters* thread_counters() const { return thread_counters_; }
     int max_num_scanner_threads() const { return max_num_scanner_threads_; }
     int64_t estimated_per_thread_mem() const { return estimated_per_thread_mem_; }
@@ -282,7 +282,7 @@ class ScanNode : public ExecNode {
     /// Outgoing row batches queue. Row batches are produced asynchronously by the scanner
     /// threads and consumed by the main fragment thread that calls GetNext() on the scan
     /// node.
-    boost::scoped_ptr<RowBatchQueue> batch_queue_;
+    boost::scoped_ptr<BlockingRowBatchQueue> batch_queue_;
 
     /// The number of scanner threads currently running.
     AtomicInt32 num_active_{0};
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index a8b5bb2..7b44200 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -26,7 +26,6 @@
 #include "runtime/exec-env.h"
 #include "runtime/mem-pool.h"
 #include "runtime/row-batch.h"
-#include "runtime/row-batch-queue.h"
 #include "runtime/runtime-state.h"
 #include "runtime/string-buffer.h"
 #include "util/debug-util.h"
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 242f02b..848ff37 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -28,6 +28,7 @@ set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/runtime")
 set_source_files_properties(${ROW_BATCH_PROTO_SRCS} PROPERTIES GENERATED TRUE)
 
 add_library(Runtime
+  blocking-row-batch-queue.cc
   buffered-tuple-stream.cc
   client-cache.cc
   collection-value.cc
@@ -38,6 +39,7 @@ add_library(Runtime
   date-value.cc
   debug-options.cc
   descriptors.cc
+  deque-row-batch-queue.cc
   dml-exec-state.cc
   exec-env.cc
   fragment-instance-state.cc
@@ -62,7 +64,6 @@ add_library(Runtime
   reservation-manager.cc
   row-batch.cc
   ${ROW_BATCH_PROTO_SRCS}
-  row-batch-queue.cc
   runtime-filter.cc
   runtime-filter-bank.cc
   runtime-filter-ir.cc
diff --git a/be/src/runtime/row-batch-queue.cc b/be/src/runtime/blocking-row-batch-queue.cc
similarity index 51%
rename from be/src/runtime/row-batch-queue.cc
rename to be/src/runtime/blocking-row-batch-queue.cc
index e694338..b711c9e 100644
--- a/be/src/runtime/row-batch-queue.cc
+++ b/be/src/runtime/blocking-row-batch-queue.cc
@@ -15,35 +15,54 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "runtime/row-batch-queue.h"
-
+#include "runtime/blocking-row-batch-queue.h"
 #include "runtime/row-batch.h"
+#include "util/runtime-profile-counters.h"
 
 #include "common/names.h"
 
+using namespace std;
+
 namespace impala {
 
-RowBatchQueue::RowBatchQueue(int max_batches, int64_t max_bytes)
-  : BlockingQueue<unique_ptr<RowBatch>,RowBatchBytesFn>(max_batches, max_bytes) {}
+BlockingRowBatchQueue::BlockingRowBatchQueue(int max_batches, int64_t max_bytes,
+    RuntimeProfile::Counter* get_batch_wait_timer,
+    RuntimeProfile::Counter* add_batch_wait_timer)
+  : batch_queue_(new BlockingQueue<std::unique_ptr<RowBatch>, RowBatchBytesFn>(
+        max_batches, max_bytes, get_batch_wait_timer, add_batch_wait_timer)) {}
 
-RowBatchQueue::~RowBatchQueue() {
+BlockingRowBatchQueue::~BlockingRowBatchQueue() {
   DCHECK(cleanup_queue_.empty());
 }
 
-void RowBatchQueue::AddBatch(unique_ptr<RowBatch> batch) {
-  if (!BlockingPut(move(batch))) {
+void BlockingRowBatchQueue::AddBatch(unique_ptr<RowBatch> batch) {
+  if (!batch_queue_->BlockingPut(move(batch))) {
     lock_guard<SpinLock> l(lock_);
     cleanup_queue_.push_back(move(batch));
   }
 }
 
-unique_ptr<RowBatch> RowBatchQueue::GetBatch() {
+bool BlockingRowBatchQueue::AddBatchWithTimeout(
+    unique_ptr<RowBatch>&& batch, int64_t timeout_micros) {
+  return batch_queue_->BlockingPutWithTimeout(
+      forward<unique_ptr<RowBatch>>(batch), timeout_micros);
+}
+
+unique_ptr<RowBatch> BlockingRowBatchQueue::GetBatch() {
   unique_ptr<RowBatch> result;
-  if (BlockingGet(&result)) return result;
+  if (batch_queue_->BlockingGet(&result)) return result;
   return unique_ptr<RowBatch>();
 }
 
-void RowBatchQueue::Cleanup() {
+bool BlockingRowBatchQueue::IsFull() const {
+  return batch_queue_->AtCapacity();
+}
+
+void BlockingRowBatchQueue::Shutdown() {
+  batch_queue_->Shutdown();
+}
+
+void BlockingRowBatchQueue::Cleanup() {
   unique_ptr<RowBatch> batch = nullptr;
   while ((batch = GetBatch()) != nullptr) {
     batch.reset();
diff --git a/be/src/runtime/blocking-row-batch-queue.h b/be/src/runtime/blocking-row-batch-queue.h
new file mode 100644
index 0000000..f7fb63c
--- /dev/null
+++ b/be/src/runtime/blocking-row-batch-queue.h
@@ -0,0 +1,114 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <list>
+#include <memory>
+
+#include "runtime/row-batch.h"
+#include "util/blocking-queue.h"
+#include "util/spinlock.h"
+
+namespace impala {
+
+class RowBatch;
+
+/// Functor that returns the bytes in MemPool chunks for a row batch.
+/// Note that we don't include attached BufferPool::BufferHandle objects because this
+/// queue is only used in scan nodes that don't attach buffers.
+struct RowBatchBytesFn {
+  int64_t operator()(const std::unique_ptr<RowBatch>& batch) {
+    return batch->tuple_data_pool()->total_reserved_bytes();
+  }
+};
+
+/// Provides blocking queue semantics for row batches. Row batches have a property that
+/// they must be processed in the order they were produced, even in cancellation
+/// paths. Preceding row batches can contain ptrs to memory in subsequent row batches
+/// and we need to make sure those ptrs stay valid.
+///
+/// Row batches that are added after Shutdown() are queued in a separate "cleanup"
+/// queue, which can be cleaned up during Close().
+///
+/// The queue supports limiting the capacity in terms of bytes enqueued and number of
+/// batches to be enqueued.
+///
+/// The queue takes in two counters: 'get_batch_wait_timer' and 'add_batch_wait_timer'.
+/// 'get_batch_wait_timer' tracks how long GetBatch spends blocking waiting for batches
+/// to be added to the queue. 'add_batch_wait_timer' tracks how long AddBatch spends
+/// blocking waiting for space to be available in the queue.
+///
+/// All functions are thread safe.
+class BlockingRowBatchQueue {
+ public:
+  /// 'max_batches' is the maximum number of row batches that can be queued.
+  /// 'max_bytes' is the maximum number of bytes of row batches that can be queued (-1
+  /// means no limit).
+  /// 'get_batch_wait_timer' tracks how long GetBatch blocks waiting for batches.
+  /// 'add_batch_wait_timer' tracks how long AddBatch blocks waiting for space in the
+  /// queue.
+  /// When the queue is full, producers will block.
+  BlockingRowBatchQueue(int max_batches, int64_t max_bytes,
+      RuntimeProfile::Counter* get_batch_wait_timer,
+      RuntimeProfile::Counter* add_batch_wait_timer);
+  ~BlockingRowBatchQueue();
+
+  /// Adds a batch to the queue. This is blocking if the queue is full.
+  void AddBatch(std::unique_ptr<RowBatch> batch);
+
+  /// Adds a batch to the queue waiting for the specified amount of time for space to
+  /// be available in the queue. Returns true if the batch was successfully added to the
+  /// queue, false otherwise. 'batch' is passed by r-value reference because this method
+  /// does not transfer ownership of the 'batch'. This is necessary because this method
+  /// may or may not successfully add 'batch' to the queue (depending on if the timeout
+  /// was hit).
+  bool AddBatchWithTimeout(std::unique_ptr<RowBatch>&& batch, int64_t timeout_micros);
+
+  /// Gets a row batch from the queue, blocks if the queue is empty. Returns NULL if
+  /// the queue has already been shutdown.
+  std::unique_ptr<RowBatch> GetBatch();
+
+  /// Returns true if the queue is full, false otherwise. Does not account of the current
+  /// size of the cleanup queue. A queue is considered full if it either contains the max
+  /// number of row batches specified in the constructor, or it contains the max number
+  /// of bytes specified in the construtor.
+  bool IsFull() const;
+
+  /// Shutdowns the underlying BlockingQueue. Future calls to AddBatch will put the
+  /// RowBatch on the cleanup queue. Future calls to GetBatch will continue to return
+  /// RowBatches from the BlockingQueue.
+  void Shutdown();
+
+  /// Resets all RowBatches currently in the queue and clears the cleanup_queue_. Not
+  /// valid to call AddBatch() after this is called. Finalizes all counters started for
+  /// this queue.
+  void Cleanup();
+
+ private:
+  /// Lock protecting cleanup_queue_
+  SpinLock lock_;
+
+  /// Queue of orphaned row batches enqueued after the RowBatchQueue has been closed.
+  /// They need to exist as preceding row batches may reference buffers owned by row
+  /// batches in this queue.
+  std::list<std::unique_ptr<RowBatch>> cleanup_queue_;
+
+  /// BlockingQueue that stores the RowBatches
+  BlockingQueue<std::unique_ptr<RowBatch>, RowBatchBytesFn>* batch_queue_;
+};
+}
diff --git a/be/src/runtime/deque-row-batch-queue.cc b/be/src/runtime/deque-row-batch-queue.cc
new file mode 100644
index 0000000..6807b8b
--- /dev/null
+++ b/be/src/runtime/deque-row-batch-queue.cc
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/deque-row-batch-queue.h"
+#include "runtime/row-batch.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+DequeRowBatchQueue::DequeRowBatchQueue(int max_batches)
+  : max_batches_(max_batches) {}
+
+DequeRowBatchQueue::~DequeRowBatchQueue() {
+  DCHECK(closed_);
+}
+
+bool DequeRowBatchQueue::AddBatch(unique_ptr<RowBatch> batch) {
+  if (closed_ || IsFull()) return false;
+  batch_queue_.push_back(move(batch));
+  return true;
+}
+
+unique_ptr<RowBatch> DequeRowBatchQueue::GetBatch() {
+  if (closed_ || IsEmpty()) return unique_ptr<RowBatch>();
+  unique_ptr<RowBatch> result = move(batch_queue_.front());
+  batch_queue_.pop_front();
+  return result;
+}
+
+bool DequeRowBatchQueue::IsFull() const {
+  return batch_queue_.size() == max_batches_;
+}
+
+bool DequeRowBatchQueue::IsEmpty() const {
+  return batch_queue_.empty();
+}
+
+bool DequeRowBatchQueue::IsOpen() const {
+  return !closed_;
+}
+
+void DequeRowBatchQueue::Close() {
+  if (closed_) return;
+  while (!batch_queue_.empty()) {
+    unique_ptr<RowBatch> result = GetBatch();
+    result->Reset();
+  }
+  batch_queue_.clear();
+  closed_ = true;
+}
+}
diff --git a/be/src/runtime/deque-row-batch-queue.h b/be/src/runtime/deque-row-batch-queue.h
new file mode 100644
index 0000000..95cc2af
--- /dev/null
+++ b/be/src/runtime/deque-row-batch-queue.h
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <queue>
+
+#include "runtime/row-batch.h"
+
+namespace impala {
+
+class RowBatch;
+
+/// A RowBatchQueue that provides non-blocking queue semantics. RowBatches are stored
+/// inside a std::deque. None of the methods block, this class is not thread safe. The
+/// size of the queue can be capped by the 'max_batches' parameter. Calls to AddBatch
+/// after the capacity has been reached will return false. Calls to GetBatch on an empty
+/// queue will return null.
+class DequeRowBatchQueue {
+ public:
+  DequeRowBatchQueue(int max_batches);
+  ~DequeRowBatchQueue();
+
+  /// Adds the given RowBatch to the queue. Returns true if the batch was successfully
+  /// added, returns false if the queue is full or has already been closed. The ownership
+  /// of the given batch is transferred from the 'batch' to the queue.
+  bool AddBatch(std::unique_ptr<RowBatch> batch);
+
+  /// Returns and removes the RowBatch at the head of the queue. Returns a nullptr if the
+  /// queue is already closed or the queue is empty. The ownership of the returned batch
+  /// is transferred from the queue to the returned unique_ptr.
+  std::unique_ptr<RowBatch> GetBatch();
+
+  /// Returns true if the queue limit has been reached, false otherwise.
+  bool IsFull() const;
+
+  /// Returns true if the queue is empty, false otherwise.
+  bool IsEmpty() const;
+
+  /// Returns false if Close() has been called, true otherwise.
+  bool IsOpen() const;
+
+  /// Resets the remaining RowBatches in the queue and releases the queue memory.
+  void Close();
+
+ private:
+  /// Queue of row batches.
+  std::deque<std::unique_ptr<RowBatch>> batch_queue_;
+
+  /// True if the queue has been closed, false otherwise.
+  bool closed_ = false;
+
+  /// The max size of the queue.
+  const int max_batches_;
+};
+}
diff --git a/be/src/runtime/row-batch-queue.h b/be/src/runtime/row-batch-queue.h
deleted file mode 100644
index 79e8293..0000000
--- a/be/src/runtime/row-batch-queue.h
+++ /dev/null
@@ -1,80 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_RUNTIME_BLOCKING_QUEUE_H
-#define IMPALA_RUNTIME_BLOCKING_QUEUE_H
-
-#include <list>
-#include <memory>
-
-#include "runtime/row-batch.h"
-#include "util/blocking-queue.h"
-#include "util/spinlock.h"
-
-namespace impala {
-
-class RowBatch;
-
-/// Functor that returns the bytes in MemPool chunks for a row batch.
-/// Note that we don't include attached BufferPool::BufferHandle objects because this
-/// queue is only used in scan nodes that don't attach buffers.
-struct RowBatchBytesFn {
-  int64_t operator()(const std::unique_ptr<RowBatch>& batch) {
-    return batch->tuple_data_pool()->total_reserved_bytes();
-  }
-};
-
-/// Extends blocking queue for row batches. Row batches have a property that
-/// they must be processed in the order they were produced, even in cancellation
-/// paths. Preceding row batches can contain ptrs to memory in subsequent row batches
-/// and we need to make sure those ptrs stay valid.
-/// Row batches that are added after Shutdown() are queued in a separate "cleanup"
-/// queue, which can be cleaned up during Close().
-///
-/// The queue supports limiting the capacity in terms of bytes enqueued.
-///
-/// All functions are thread safe.
-class RowBatchQueue : public BlockingQueue<std::unique_ptr<RowBatch>, RowBatchBytesFn> {
- public:
-  /// 'max_batches' is the maximum number of row batches that can be queued.
-  /// 'max_bytes' is the maximum number of bytes of row batches that can be queued (-1
-  /// means no limit).
-  /// When the queue is full, producers will block.
-  RowBatchQueue(int max_batches, int64_t max_bytes);
-  ~RowBatchQueue();
-
-  /// Adds a batch to the queue. This is blocking if the queue is full.
-  void AddBatch(std::unique_ptr<RowBatch> batch);
-
-  /// Gets a row batch from the queue. Returns NULL if there are no more.
-  /// This function blocks.
-  /// Returns NULL after Shutdown().
-  std::unique_ptr<RowBatch> GetBatch();
-
-  /// Deletes all row batches in cleanup_queue_. Not valid to call AddBatch()
-  /// after this is called.
-  void Cleanup();
-
- private:
-  /// Lock protecting cleanup_queue_
-  SpinLock lock_;
-
-  /// Queue of orphaned row batches
-  std::list<std::unique_ptr<RowBatch>> cleanup_queue_;
-};
-}
-#endif
diff --git a/be/src/util/blocking-queue.h b/be/src/util/blocking-queue.h
index 34c2453..811e029 100644
--- a/be/src/util/blocking-queue.h
+++ b/be/src/util/blocking-queue.h
@@ -29,6 +29,7 @@
 #include "common/compiler-util.h"
 #include "util/aligned-new.h"
 #include "util/condition-variable.h"
+#include "util/runtime-profile.h"
 #include "util/stopwatch.h"
 #include "util/time.h"
 
@@ -59,15 +60,21 @@ struct ByteLimitDisabledFn {
 /// held before 'put_lock_'. When the 'get_list_' is empty, the caller of BlockingGet()
 /// will atomically swap the 'put_list_' with 'get_list_'. The swapping happens with both
 /// the 'get_lock_' and 'put_lock_' held.
+///
+/// The queue supports two optional RuntimeProfile::Counters. One to track the amount
+/// of time spent blocking in BlockingGet() and the other to track the amount of time
+/// spent in BlockingPut().
 template <typename T, typename ElemBytesFn = ByteLimitDisabledFn<T>>
 class BlockingQueue : public CacheLineAligned {
  public:
-  BlockingQueue(size_t max_elements, int64_t max_bytes = -1)
+  BlockingQueue(size_t max_elements, int64_t max_bytes = -1,
+      RuntimeProfile::Counter* get_wait_timer = nullptr,
+      RuntimeProfile::Counter* put_wait_timer = nullptr)
     : shutdown_(false),
       max_elements_(max_elements),
-      total_put_wait_time_(0),
+      put_wait_timer_(put_wait_timer),
       get_list_size_(0),
-      total_get_wait_time_(0),
+      get_wait_timer_(get_wait_timer),
       max_bytes_(max_bytes) {
     DCHECK(max_bytes == -1 || max_bytes > 0) << max_bytes;
     DCHECK_GT(max_elements_, 0);
@@ -98,15 +105,15 @@ class BlockingQueue : public CacheLineAligned {
         put_cv_.NotifyOne();
         // Sleep with 'get_lock_' held to block off other readers which cannot
         // make progress anyway.
-        timer.Start();
+        if (get_wait_timer_ != nullptr) timer.Start();
         get_cv_.Wait(write_lock);
-        timer.Stop();
+        if (get_wait_timer_ != nullptr) timer.Stop();
       }
       DCHECK(!put_list_.empty());
       put_list_.swap(get_list_);
       get_list_size_.Store(get_list_.size());
       write_lock.unlock();
-      total_get_wait_time_ += timer.ElapsedTime();
+      if (get_wait_timer_ != nullptr) get_wait_timer_->Add(timer.ElapsedTime());
     }
 
     DCHECK(!get_list_.empty());
@@ -143,11 +150,11 @@ class BlockingQueue : public CacheLineAligned {
     DCHECK_GE(val_bytes, 0);
     boost::unique_lock<boost::mutex> write_lock(put_lock_);
     while (!HasCapacityInternal(write_lock, val_bytes) && !shutdown_) {
-      timer.Start();
+      if (put_wait_timer_ != nullptr) timer.Start();
       put_cv_.Wait(write_lock);
-      timer.Stop();
+      if (put_wait_timer_ != nullptr) timer.Stop();
     }
-    total_put_wait_time_ += timer.ElapsedTime();
+    if (put_wait_timer_ != nullptr) put_wait_timer_->Add(timer.ElapsedTime());
     if (UNLIKELY(shutdown_)) return false;
 
     DCHECK_LT(put_list_.size(), max_elements_);
@@ -173,12 +180,12 @@ class BlockingQueue : public CacheLineAligned {
     TimeFromNowMicros(timeout_micros, &abs_time);
     bool notified = true;
     while (!HasCapacityInternal(write_lock, val_bytes) && !shutdown_ && notified) {
-      timer.Start();
+      if (put_wait_timer_ != nullptr) timer.Start();
       // Wait until we're notified or until the timeout expires.
       notified = put_cv_.WaitUntil(write_lock, abs_time);
-      timer.Stop();
+      if (put_wait_timer_ != nullptr) timer.Stop();
     }
-    total_put_wait_time_ += timer.ElapsedTime();
+    if (put_wait_timer_ != nullptr) put_wait_timer_->Add(timer.ElapsedTime());
     // If the list is still full or if the the queue has been shut down, return false.
     // NOTE: We don't check 'notified' here as it appears that pthread condition variables
     // have a weird behavior in which they can return ETIMEDOUT from timed_wait even if
@@ -215,18 +222,6 @@ class BlockingQueue : public CacheLineAligned {
     return SizeLocked(write_lock) >= max_elements_;
   }
 
-  int64_t total_get_wait_time() const {
-    // Hold lock to make sure the value read is consistent (i.e. no torn read).
-    boost::lock_guard<boost::mutex> read_lock(get_lock_);
-    return total_get_wait_time_;
-  }
-
-  int64_t total_put_wait_time() const {
-    // Hold lock to make sure the value read is consistent (i.e. no torn read).
-    boost::lock_guard<boost::mutex> write_lock(put_lock_);
-    return total_put_wait_time_;
-  }
-
  private:
 
   uint32_t ALWAYS_INLINE SizeLocked(const boost::unique_lock<boost::mutex>& lock) const {
@@ -277,7 +272,7 @@ class BlockingQueue : public CacheLineAligned {
   ConditionVariable put_cv_;
 
   /// Total amount of time threads blocked in BlockingPut(). Guarded by 'put_lock_'.
-  int64_t total_put_wait_time_;
+  RuntimeProfile::Counter* put_wait_timer_ = nullptr;
 
   /// Running counter for bytes enqueued, incremented through the producer thread.
   /// Decremented by transferring value from 'get_bytes_dequeued_'.
@@ -300,7 +295,7 @@ class BlockingQueue : public CacheLineAligned {
   /// Total amount of time a thread blocked in BlockingGet(). Guarded by 'get_lock_'.
   /// Note that a caller of BlockingGet() may sleep with 'get_lock_' held and this
   /// variable doesn't include the time which other threads block waiting for 'get_lock_'.
-  int64_t total_get_wait_time_;
+  RuntimeProfile::Counter* get_wait_timer_ = nullptr;
 
   /// Running count of bytes dequeued. Decremented from 'put_bytes_enqueued_' when it
   /// exceeds the queue capacity. Kept separate from 'put_bytes_enqueued_' so that
diff --git a/tests/query_test/test_result_spooling.py b/tests/query_test/test_result_spooling.py
index 5f4c5e7..40b55b6 100644
--- a/tests/query_test/test_result_spooling.py
+++ b/tests/query_test/test_result_spooling.py
@@ -24,10 +24,9 @@ class TestResultSpooling(ImpalaTestSuite):
     return 'functional-query'
 
   def test_result_spooling(self):
-    """Test that setting SPOOL_QUERY_RESULTS does not crash Impala. The implementation
-    of query result spooling has not been completed yet, so queries that run when
-    SPOOL_QUERY_RESULTS = true, will return no results."""
+    """Tests that setting SPOOL_QUERY_RESULTS = true for simple queries returns the
+    correct number of results."""
     query_opts = {"spool_query_results": "true"}
     query = "select * from functional.alltypes limit 10"
     result = self.execute_query_expect_success(self.client, query, query_opts)
-    assert(len(result.data) == 0)
+    assert(len(result.data) == 10)


[impala] 05/05: IMPALA-8636: fix flakiness of ACID INSERT tests

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stakiar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 48bb93d4744f54f609f4f81580b17ef39d1f1a2b
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Wed Jul 31 17:02:42 2019 +0200

    IMPALA-8636: fix flakiness of ACID INSERT tests
    
    I had to add @UniqueDatabase.parametrize(sync_ddl=True) to some e2e
    tests because they were broken in exhaustive mode. When the tests run
    with sync_ddl=True then the test files are executed against multiple
    impalads which means that each statement in the .test file is executed
    against a random impalad.
    
    Change-Id: Ic724e77833ed9ea58268e1857de0d33f9577af8b
    Reviewed-on: http://gerrit.cloudera.org:8080/13966
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 tests/query_test/test_insert.py | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/tests/query_test/test_insert.py b/tests/query_test/test_insert.py
index 95df39b..41cce30 100644
--- a/tests/query_test/test_insert.py
+++ b/tests/query_test/test_insert.py
@@ -155,11 +155,13 @@ class TestInsertQueries(ImpalaTestSuite):
     self.hive_client.setMetaConf("metastore.client.capability.check", capability_check)
 
   @SkipIfHive2.acid
+  @UniqueDatabase.parametrize(sync_ddl=True)
   def test_acid_nonacid_insert(self, vector, unique_database):
     self.run_test_case('QueryTest/acid-nonacid-insert', vector, unique_database,
         multiple_impalad=vector.get_value('exec_option')['sync_ddl'] == 1)
 
   @SkipIfHive2.acid
+  @UniqueDatabase.parametrize(sync_ddl=True)
   def test_acid_insert_fail(self, vector, unique_database):
     self.run_test_case('QueryTest/acid-insert-fail', vector, unique_database,
         multiple_impalad=vector.get_value('exec_option')['sync_ddl'] == 1)


[impala] 04/05: IMPALA-8812: [DOCS] Negative index support in SPLIT_PART function

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stakiar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit e8bd307941f8734d24b5e3ce61e2b319f59563c5
Author: Alex Rodoni <ar...@cloudera.com>
AuthorDate: Wed Jul 31 12:56:10 2019 -0700

    IMPALA-8812: [DOCS] Negative index support in SPLIT_PART function
    
    Change-Id: I1b1810d317167fae5e0b050dfd6a7dd7a7762bb3
    Reviewed-on: http://gerrit.cloudera.org:8080/13970
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Norbert Luksa <no...@cloudera.com>
    Reviewed-by: Alex Rodoni <ar...@cloudera.com>
---
 docs/topics/impala_string_functions.xml | 115 +++++++++++---------------------
 1 file changed, 39 insertions(+), 76 deletions(-)

diff --git a/docs/topics/impala_string_functions.xml b/docs/topics/impala_string_functions.xml
index 580d7c3..2d2c760 100644
--- a/docs/topics/impala_string_functions.xml
+++ b/docs/topics/impala_string_functions.xml
@@ -1396,94 +1396,57 @@ select replace('hello world','xyz','abc');
       <dlentry rev="2.3.0 IMPALA-2084" id="split_part">
 
         <dt>
-          SPLIT_PART(STRING source, STRING delimiter, BIGINT n)
+          SPLIT_PART(STRING source, STRING delimiter, BIGINT index)
         </dt>
 
         <dd>
-          <b>Purpose:</b> Returns the nth field within a delimited string. The fields are
-          numbered starting from 1. The delimiter can consist of multiple characters, not just a
-          single character. All matching of the delimiter is done exactly, not using any regular
-          expression patterns.
+          <b>Purpose:</b> Returns the requested <codeph>index</codeph>th part of the input
+          <varname>source</varname> string split by the <varname>delimiter</varname>.
+          <ul>
+            <li>
+              If <varname>index</varname> is a positive number, returns the
+              <varname>index</varname>th part from the left within the <varname>source</varname>
+              string.
+            </li>
+
+            <li>
+              If <varname>index</varname> is a negative number, returns the
+              <varname>index</varname>th part from the right within the
+              <varname>source</varname> string.
+            </li>
+
+            <li>
+              If <varname>index</varname> is 0, returns an error.
+            </li>
+          </ul>
+          <p>
+            The <varname>delimiter</varname> can consist of multiple characters, not just a
+            single character.
+          </p>
+          <p>
+            All matching of the delimiter is done exactly, not using any regular expression
+            patterns.
+          </p>
           <p>
             <b>Return type:</b> <codeph>STRING</codeph>
           </p>
-
-          <p conref="../shared/impala_common.xml#common/regexp_re2"/>
-
-          <p conref="../shared/impala_common.xml#common/regexp_re2_warning"/>
-
-          <p conref="../shared/impala_common.xml#common/regexp_escapes"/>
-
-          <p conref="../shared/impala_common.xml#common/example_blurb"/>
-
+          <p conref="../shared/impala_common.xml#common/example_blurb"
+              />
           <p>
-            These examples show how to retrieve the nth field from a delimited string:
+            <codeph>SPLIT_PART('x,y,z',',',2)</codeph> returns <codeph>'y'</codeph>.
           </p>
-<codeblock><![CDATA[
-select split_part('x,y,z',',',1);
-+-----------------------------+
-| split_part('x,y,z', ',', 1) |
-+-----------------------------+
-| x                           |
-+-----------------------------+
-
-select split_part('x,y,z',',',2);
-+-----------------------------+
-| split_part('x,y,z', ',', 2) |
-+-----------------------------+
-| y                           |
-+-----------------------------+
-
-select split_part('x,y,z',',',3);
-+-----------------------------+
-| split_part('x,y,z', ',', 3) |
-+-----------------------------+
-| z                           |
-+-----------------------------+
-]]>
-</codeblock>
           <p>
-            These examples show what happens for out-of-range field positions. Specifying a
-            value less than 1 produces an error. Specifying a value greater than the number of
-            fields returns a zero-length string (which is not the same as
-            <codeph>NULL</codeph>).
+            <codeph>SPLIT_PART('one***two***three','***',2)</codeph> returns
+            <codeph>'two'</codeph>.
           </p>
-<codeblock><![CDATA[
-select split_part('x,y,z',',',0);
-ERROR: Invalid field position: 0
-
-with t1 as (select split_part('x,y,z',',',4) nonexistent_field)
-  select
-      nonexistent_field
-    , concat('[',nonexistent_field,']')
-    , length(nonexistent_field);
-from t1
-+-------------------+-------------------------------------+---------------------------+
-| nonexistent_field | concat('[', nonexistent_field, ']') | length(nonexistent_field) |
-+-------------------+-------------------------------------+---------------------------+
-|                   | []                                  | 0                         |
-+-------------------+-------------------------------------+---------------------------+
-]]>
-</codeblock>
           <p>
-            These examples show how the delimiter can be a multi-character value:
+            <codeph>SPLIT_PART('abc@@def@@ghi', '@@', 3)</codeph> returns
+            <codeph>'ghi'</codeph>.
+          </p>
+          <p>
+            <codeph>SPLIT_PART('abc@@def@@ghi', '@@', -3)</codeph> returns
+            <codeph>'abc'</codeph>.
           </p>
-<codeblock><![CDATA[
-select split_part('one***two***three','***',2);
-+-------------------------------------------+
-| split_part('one***two***three', '***', 2) |
-+-------------------------------------------+
-| two                                       |
-+-------------------------------------------+
-
-select split_part('one\|/two\|/three','\|/',3);
-+-------------------------------------------+
-| split_part('one\|/two\|/three', '\|/', 3) |
-+-------------------------------------------+
-| three                                     |
-+-------------------------------------------+
-]]>
-</codeblock>
         </dd>
 
       </dlentry>


[impala] 03/05: IMPALA-8600: Refresh transactional tables

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stakiar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 2d819655118c8c6e82649e3c3821311f3dd01174
Author: Gabor Kaszab <ga...@cloudera.com>
AuthorDate: Sat Jul 27 12:26:20 2019 +0200

    IMPALA-8600: Refresh transactional tables
    
    Refreshing a subset of partitions in a transactional table might lead
    us to an inconsistent state of that transactional table. As a fix
    user initiated partition refreshes are no longer allowed on ACID
    tables. Additionally, a refresh partition Metastore event actually
    triggers a refresh on the whole ACID table.
    
    An optimisation is implemented to check the locally latest table
    level writeId, fetch the same from HMS and do a refresh only if they
    don't match.
    This couldn't be done for partitioned tables as apparently Hive
    doesn't update the table level writeId if the transactional table is
    partitioned. Similarly, checking the writeId for each partition and
    refresh only the ones where the writeId is not up to date is not
    feasible either as there is no writeId update when Hive makes schema
    changes like adding a column neither on table level or on partition
    level. So after a adding a column in Hive to a partitioned ACID table
    and refreshing that table in Impala, still Impala wouldn't see the
    new column. Hence, I unconditionally refresh the whole table if it's
    ACID and partitioned. Note, that for non-partitioned ACID tables Hive
    updates the table level writeId even for schema changes.
    
    Change-Id: I1851da22452074dbe253bcdd97145e06c7552cd3
    Reviewed-on: http://gerrit.cloudera.org:8080/13938
    Reviewed-by: Csaba Ringhofer <cs...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../apache/impala/analysis/ResetMetadataStmt.java  |  14 ++
 .../java/org/apache/impala/catalog/HdfsTable.java  |   7 +
 .../impala/catalog/events/MetastoreEvents.java     | 185 ++++++++++++---------
 .../apache/impala/service/CatalogOpExecutor.java   |  52 +++++-
 .../java/org/apache/impala/util/AcidUtils.java     |   2 +-
 .../org/apache/impala/analysis/AnalyzerTest.java   |   9 +
 .../functional-query/queries/QueryTest/acid.test   |   8 +
 tests/custom_cluster/test_event_processing.py      |  28 +++-
 8 files changed, 220 insertions(+), 85 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java b/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java
index ac4e69e..4200dd8 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java
@@ -22,12 +22,15 @@ import java.util.List;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.impala.authorization.Privilege;
 import org.apache.impala.authorization.User;
+import org.apache.impala.catalog.FeTable;
+import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TCatalogServiceRequestHeader;
 import org.apache.impala.thrift.TResetMetadataRequest;
 import org.apache.impala.thrift.TTableName;
+import org.apache.impala.util.AcidUtils;
 
 import com.google.common.base.Preconditions;
 
@@ -150,6 +153,17 @@ public class ResetMetadataStmt extends StatementBase {
                 tableName_);
           }
           if (partitionSpec_ != null) {
+            try {
+              // Get local table info without reaching out to HMS
+              FeTable table = analyzer.getTable(dbName, tableName_.getTbl());
+              if (AcidUtils.isTransactionalTable(
+                      table.getMetaStoreTable().getParameters())) {
+                throw new AnalysisException("Refreshing a partition is not allowed on " +
+                    "transactional tables. Try to refresh the whole table instead.");
+              }
+            } catch (TableLoadingException e) {
+              throw new AnalysisException(e);
+            }
             partitionSpec_.setPrivilegeRequirement(Privilege.ANY);
             partitionSpec_.analyze(analyzer);
           }
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 2bb579f..8f440f2 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -1897,4 +1897,11 @@ public class HdfsTable extends Table implements FeFsTable {
     tmpTable.setTableStats(msTbl);
     return tmpTable;
   }
+
+  /**
+   * Returns true if the table is partitioned, false otherwise.
+   */
+  public boolean isPartitioned() {
+    return getMetaStoreTable().getPartitionKeysSize() > 0;
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index d626b52..315a38c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -55,6 +55,7 @@ import org.apache.impala.common.Pair;
 import org.apache.impala.common.Reference;
 import org.apache.impala.thrift.TPartitionKeyValue;
 import org.apache.impala.thrift.TTableName;
+import org.apache.impala.util.AcidUtils;
 import org.apache.impala.util.ClassUtil;
 import org.slf4j.LoggerFactory;
 import org.slf4j.Logger;
@@ -602,6 +603,21 @@ public class MetastoreEvents {
       String partString = FileUtils.makePartName(partitionCols, partitionVals);
       return partString;
     }
+
+    /*
+     * Helper function to initiate a table reload on Catalog. Re-throws the exception if
+     * the catalog operation throws.
+     */
+    protected void reloadTableFromCatalog(String operation) throws CatalogException {
+      if (!catalog_.reloadTableIfExists(dbName_, tblName_,
+              "Processing " + operation + " event from HMS")) {
+        debugLog("Automatic refresh on table {} failed as the table is not "
+            + "present either in catalog or metastore.", getFullyQualifiedTblName());
+      } else {
+        infoLog("Table {} has been refreshed after " + operation +".",
+            getFullyQualifiedTblName());
+      }
+    }
   }
 
   /**
@@ -729,7 +745,7 @@ public class MetastoreEvents {
   public static class InsertEvent extends MetastoreTableEvent {
 
     // Represents the partition for this insert. Null if the table is unpartitioned.
-    private final org.apache.hadoop.hive.metastore.api.Partition insertPartition_;
+    private Partition insertPartition_;
 
     /**
      * Prevent instantiation from outside should use MetastoreEventFactory instead
@@ -760,9 +776,14 @@ public class MetastoreEvents {
      */
     @Override
     public void process() throws MetastoreNotificationException {
-      if (insertPartition_ != null)
+      // Reload the whole table if it's a transactional table.
+      if (AcidUtils.isTransactionalTable(msTbl_.getParameters())) {
+        insertPartition_ = null;
+      }
+
+      if (insertPartition_ != null) {
         processPartitionInserts();
-      else {
+      } else {
         processTableInserts();
       }
     }
@@ -812,14 +833,7 @@ public class MetastoreEvents {
       try {
         // Ignore event if table or database is not in the catalog. Throw exception if
         // refresh fails.
-        if (!catalog_.reloadTableIfExists(dbName_, tblName_,
-              "processing table-level INSERT event from HMS")) {
-          debugLog("Automatic refresh table {} failed as the table is not "
-              + "present either catalog or metastore.", getFullyQualifiedTblName());
-        } else {
-          infoLog("Table {} has been refreshed after insert.",
-              getFullyQualifiedTblName());
-        }
+        reloadTableFromCatalog("table-level INSERT");
       } catch (DatabaseNotFoundException e) {
         debugLog("Automatic refresh of table {} insert failed as the "
             + "database is not present in the catalog.", getFullyQualifiedTblName());
@@ -1357,28 +1371,33 @@ public class MetastoreEvents {
       // Notification is created for newly created partitions only. We need not worry
       // about "IF NOT EXISTS".
       try {
-        boolean success = true;
-        // HMS adds partitions in a transactional way. This means there may be multiple
-        // HMS partition objects in an add_partition event. We try to do the same here by
-        // refreshing all those partitions in a loop. If any partition refresh fails, we
-        // throw MetastoreNotificationNeedsInvalidateException exception. We skip
-        // refresh of the partitions if the table is not present in the catalog.
-        infoLog("Trying to refresh {} partitions added to table {} in the event",
-            addedPartitions_.size(), getFullyQualifiedTblName());
-        for (Partition partition : addedPartitions_) {
-          List<TPartitionKeyValue> tPartSpec =
-              getTPartitionSpecFromHmsPartition(msTbl_, partition);
-          if (!catalog_.reloadPartitionIfExists(dbName_, tblName_, tPartSpec,
-              "processing ADD_PARTITION event from HMS")) {
-            debugLog("Refresh partitions on table {} failed "
-                + "as table was not present in the catalog.", getFullyQualifiedTblName());
-            success = false;
-            break;
+        // Reload the whole table if it's a transactional table.
+        if (AcidUtils.isTransactionalTable(msTbl_.getParameters())) {
+          reloadTableFromCatalog("ADD_PARTITION");
+        } else {
+          boolean success = true;
+          // HMS adds partitions in a transactional way. This means there may be multiple
+          // HMS partition objects in an add_partition event. We try to do the same here
+          // by refreshing all those partitions in a loop. If any partition refresh fails,
+          // we throw MetastoreNotificationNeedsInvalidateException exception. We skip
+          // refresh of the partitions if the table is not present in the catalog.
+          infoLog("Trying to refresh {} partitions added to table {} in the event",
+              addedPartitions_.size(), getFullyQualifiedTblName());
+          for (Partition partition : addedPartitions_) {
+            List<TPartitionKeyValue> tPartSpec =
+                getTPartitionSpecFromHmsPartition(msTbl_, partition);
+            if (!catalog_.reloadPartitionIfExists(dbName_, tblName_, tPartSpec,
+                "processing ADD_PARTITION event from HMS")) {
+              debugLog("Refresh partitions on table {} failed as table was not present " +
+                  "in the catalog.", getFullyQualifiedTblName());
+              success = false;
+              break;
+            }
+          }
+          if (success) {
+            infoLog("Refreshed {} partitions of table {}", addedPartitions_.size(),
+                getFullyQualifiedTblName());
           }
-        }
-        if (success) {
-          infoLog("Refreshed {} partitions of table {}", addedPartitions_.size(),
-              getFullyQualifiedTblName());
         }
       } catch (DatabaseNotFoundException e) {
         debugLog("Refresh partitions on table {} after add_partitions event failed as "
@@ -1448,34 +1467,39 @@ public class MetastoreEvents {
         infoLog("Not processing the event as it is a self-event");
         return;
       }
-      // Refresh the partition that was altered.
-      Preconditions.checkNotNull(partitionAfter_);
-      List<TPartitionKeyValue> tPartSpec = getTPartitionSpecFromHmsPartition(msTbl_,
-          partitionAfter_);
-      try {
-        // Ignore event if table or database is not in catalog. Throw exception if
-        // refresh fails.
-
-        if (!catalog_.reloadPartitionIfExists(dbName_, tblName_, tPartSpec,
-            "processing ALTER_PARTITION event from HMS")) {
-          debugLog("Refresh of table {} partition {} failed as the table "
-                  + "is not present in the catalog.", getFullyQualifiedTblName(),
-              constructPartitionStringFromTPartitionSpec(tPartSpec));
-        } else {
-          infoLog("Table {} partition {} has been refreshed", getFullyQualifiedTblName(),
+      // Reload the whole table if it's a transactional table.
+      if (AcidUtils.isTransactionalTable(msTbl_.getParameters())) {
+        reloadTableFromCatalog("ALTER_PARTITION");
+      } else {
+        // Refresh the partition that was altered.
+        Preconditions.checkNotNull(partitionAfter_);
+        List<TPartitionKeyValue> tPartSpec = getTPartitionSpecFromHmsPartition(msTbl_,
+            partitionAfter_);
+        try {
+          // Ignore event if table or database is not in catalog. Throw exception if
+          // refresh fails.
+          if (!catalog_.reloadPartitionIfExists(dbName_, tblName_, tPartSpec,
+              "processing ALTER_PARTITION event from HMS")) {
+            debugLog("Refresh of table {} partition {} failed as the table "
+                    + "is not present in the catalog.", getFullyQualifiedTblName(),
+                constructPartitionStringFromTPartitionSpec(tPartSpec));
+          } else {
+            infoLog("Table {} partition {} has been refreshed",
+                getFullyQualifiedTblName(),
+                constructPartitionStringFromTPartitionSpec(tPartSpec));
+          }
+        } catch (DatabaseNotFoundException e) {
+          debugLog("Refresh of table {} partition {} "
+                  + "event failed as the database is not present in the catalog.",
+              getFullyQualifiedTblName(),
               constructPartitionStringFromTPartitionSpec(tPartSpec));
+        } catch (CatalogException e) {
+          throw new MetastoreNotificationNeedsInvalidateException(debugString("Refresh "
+                  + "partition on table {} partition {} failed. Event processing cannot "
+                  + "continue. Issue and invalidate command to reset the event processor "
+                  + "state.", getFullyQualifiedTblName(),
+              constructPartitionStringFromTPartitionSpec(tPartSpec)), e);
         }
-      } catch (DatabaseNotFoundException e) {
-        debugLog("Refresh of table {} partition {} "
-                + "event failed as the database is not present in the catalog.",
-            getFullyQualifiedTblName(),
-            constructPartitionStringFromTPartitionSpec(tPartSpec));
-      } catch (CatalogException e) {
-        throw new MetastoreNotificationNeedsInvalidateException(debugString("Refresh "
-                + "partition on table {} partition {} failed. Event processing cannot "
-                + "continue. Issue and invalidate command to reset the event processor "
-                + "state.", getFullyQualifiedTblName(),
-            constructPartitionStringFromTPartitionSpec(tPartSpec)), e);
       }
     }
 
@@ -1532,29 +1556,34 @@ public class MetastoreEvents {
       // We do not need self event as dropPartition() call is a no-op if the directory
       // doesn't exist.
       try {
-        boolean success = true;
-        // We refresh all the partitions that were dropped from HMS. If a refresh
-        // fails, we throw a MetastoreNotificationNeedsInvalidateException
-        infoLog("{} partitions dropped from table {}. Trying "
-            + "to refresh.", droppedPartitions_.size(), getFullyQualifiedTblName());
-        for (Map<String, String> partSpec : droppedPartitions_) {
-          List<TPartitionKeyValue> tPartSpec = new ArrayList<>(partSpec.size());
-          for (Map.Entry<String, String> entry : partSpec.entrySet()) {
-            tPartSpec.add(new TPartitionKeyValue(entry.getKey(), entry.getValue()));
+        // Reload the whole table if it's a transactional table.
+        if (AcidUtils.isTransactionalTable(msTbl_.getParameters())) {
+          reloadTableFromCatalog("DROP_PARTITION");
+        } else {
+          boolean success = true;
+          // We refresh all the partitions that were dropped from HMS. If a refresh
+          // fails, we throw a MetastoreNotificationNeedsInvalidateException
+          infoLog("{} partitions dropped from table {}. Trying "
+              + "to refresh.", droppedPartitions_.size(), getFullyQualifiedTblName());
+          for (Map<String, String> partSpec : droppedPartitions_) {
+            List<TPartitionKeyValue> tPartSpec = new ArrayList<>(partSpec.size());
+            for (Map.Entry<String, String> entry : partSpec.entrySet()) {
+              tPartSpec.add(new TPartitionKeyValue(entry.getKey(), entry.getValue()));
+            }
+            if (!catalog_.reloadPartitionIfExists(dbName_, tblName_, tPartSpec,
+                "processing DROP_PARTITION event from HMS")) {
+              debugLog("Could not refresh partition {} of table {} as table "
+                      + "was not present in the catalog.",
+                      getFullyQualifiedTblName());
+              success = false;
+              break;
+            }
           }
-          if (!catalog_.reloadPartitionIfExists(dbName_, tblName_, tPartSpec,
-              "processing DROP_PARTITION event from HMS")) {
-            debugLog("Could not refresh partition {} of table {} as table "
-                    + "was not present in the catalog.",
-                    getFullyQualifiedTblName());
-            success = false;
-            break;
+          if (success) {
+            infoLog("Refreshed {} partitions of table {}", droppedPartitions_.size(),
+                getFullyQualifiedTblName());
           }
         }
-        if (success) {
-          infoLog("Refreshed {} partitions of table {}", droppedPartitions_.size(),
-              getFullyQualifiedTblName());
-        }
       } catch (DatabaseNotFoundException e) {
         debugLog("Could not refresh partitions of table {}"
             + "as database was not present in the catalog.", getFullyQualifiedTblName());
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 1d1fc15..0177c8d 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -83,6 +83,7 @@ import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.catalog.PartitionNotFoundException;
 import org.apache.impala.catalog.PartitionStatsUtil;
+import org.apache.impala.catalog.PrunablePartition;
 import org.apache.impala.catalog.RowFormat;
 import org.apache.impala.catalog.ScalarFunction;
 import org.apache.impala.catalog.Table;
@@ -91,7 +92,6 @@ import org.apache.impala.catalog.TableNotFoundException;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.catalog.View;
 import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventPropertyKey;
-import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.ImpalaRuntimeException;
@@ -3524,6 +3524,22 @@ public class CatalogOpExecutor {
     return msTbl;
   }
 
+  /**
+   * Returns the metastore.api.Table object from the Hive Metastore for an existing
+   * fully loaded table. Gets the MetaStore object from 'catalog_'.
+   */
+  private org.apache.hadoop.hive.metastore.api.Table getTableFromMetaStore(
+      TableName tblName) throws CatalogException {
+    Preconditions.checkNotNull(tblName);
+    org.apache.hadoop.hive.metastore.api.Table msTbl = null;
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+      msTbl = msClient.getHiveClient().getTable(tblName.getDb(),tblName.getTbl());
+    } catch (TException e) {
+      LOG.error(String.format(HMS_RPC_ERROR_FORMAT_STR, "getTable") + e.getMessage());
+    }
+    return msTbl;
+  }
+
   private static List<FieldSchema> buildFieldSchemaList(List<TColumn> columns) {
     List<FieldSchema> fsList = Lists.newArrayList();
     // Add in all the columns
@@ -3591,15 +3607,45 @@ public class CatalogOpExecutor {
         if (tbl != null) {
           // If the table is not loaded, no need to perform refresh after the initial
           // metadata load.
-          boolean needsRefresh = tbl.isLoaded();
+          boolean isTableLoadedInCatalog = tbl.isLoaded();
           tbl = getExistingTable(tblName.getDb(), tblName.getTbl(),
               "Load triggered by " + cmdString);
           if (tbl != null) {
-            if (needsRefresh) {
+            if (isTableLoadedInCatalog) {
+              boolean isTransactional = AcidUtils.isTransactionalTable(
+                  tbl.getMetaStoreTable().getParameters());
               if (req.isSetPartition_spec()) {
+                Preconditions.checkArgument(!isTransactional);
                 updatedThriftTable = catalog_.reloadPartition(tbl,
                     req.getPartition_spec(), cmdString);
               } else {
+                if (isTransactional) {
+                  org.apache.hadoop.hive.metastore.api.Table hmsTbl =
+                      getTableFromMetaStore(tblName);
+                  if (hmsTbl == null) {
+                      throw new TableNotFoundException("Table not found: " +
+                          tblName.toString());
+                  }
+                  HdfsTable hdfsTable = (HdfsTable)tbl;
+                  if (!hdfsTable.isPartitioned() &&
+                      MetastoreShim.getWriteIdFromMSTable(tbl.getMetaStoreTable()) ==
+                      MetastoreShim.getWriteIdFromMSTable(hmsTbl)) {
+                    // No need to refresh the table if the local writeId equals to the
+                    // latest writeId from HMS and the table is not partitioned.
+                    LOG.debug("Skip reloading table " + tblName.toString() +
+                        " because it has the latest writeId locally");
+                    resp.getResult().setStatus(new TStatus(TErrorCode.OK,
+                        new ArrayList<String>()));
+                    return resp;
+                  }
+                  // TODO IMPALA-8809: Optimisation for partitioned tables:
+                  //   1: Reload the whole table if schema change happened. Identify
+                  //     such scenario by checking Table.TBL_PROP_LAST_DDL_TIME property.
+                  //     Note, table level writeId is not updated by HMS for partitioned
+                  //     ACID tables, there is a Jira to cover this: HIVE-22062.
+                  //   2: If no need for a full table reload then fetch partition level
+                  //     writeIds and reload only the ones that changed.
+                }
                 updatedThriftTable = catalog_.reloadTable(tbl, cmdString);
               }
             } else {
diff --git a/fe/src/main/java/org/apache/impala/util/AcidUtils.java b/fe/src/main/java/org/apache/impala/util/AcidUtils.java
index 4f4ee7a..a27888d 100644
--- a/fe/src/main/java/org/apache/impala/util/AcidUtils.java
+++ b/fe/src/main/java/org/apache/impala/util/AcidUtils.java
@@ -43,7 +43,7 @@ import javax.annotation.Nullable;
  * Contains utility functions for working with Acid tables.
  * <p>
  * The code is mostly copy pasted from Hive. Ideally we should use the
- * the code directly from Hive.
+ * code directly from Hive.
  * </p>
  */
 public class AcidUtils {
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
index ebca3cf..909708c 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
@@ -611,6 +611,15 @@ public class AnalyzerTest extends FrontendTestBase {
 
     AnalyzesOk("show column stats functional_orc_def.full_transactional_table");
     AnalyzesOk("show column stats functional.insert_only_transactional_table");
+
+    AnalyzesOk("refresh functional.insert_only_transactional_table");
+    AnalyzesOk("refresh functional_orc_def.full_transactional_table");
+    AnalysisError("refresh functional.insert_only_transactional_table partition (j=1)",
+        "Refresh a partition is not allowed on transactional tables. Try to refresh " +
+        "the whole table instead.");
+    AnalysisError("refresh functional_orc_def.full_transactional_table partition (j=1)",
+        "Refresh a partition is not allowed on transactional tables. Try to refresh " +
+        "the whole table instead.");
   }
 
   @Test
diff --git a/testdata/workloads/functional-query/queries/QueryTest/acid.test b/testdata/workloads/functional-query/queries/QueryTest/acid.test
index 3612e18..dac75f4 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/acid.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/acid.test
@@ -24,6 +24,14 @@ select * from tt order by x;
 1
 2
 ====
+---- QUERY
+# Do a second refresh on an already refreshed ACID table.
+refresh tt;
+select * from tt order by x;
+---- RESULTS
+1
+2
+====
 ---- HIVE_QUERY
 use $DATABASE;
 insert overwrite table tt values (3);
diff --git a/tests/custom_cluster/test_event_processing.py b/tests/custom_cluster/test_event_processing.py
index 4397e25..2aa22d4 100644
--- a/tests/custom_cluster/test_event_processing.py
+++ b/tests/custom_cluster/test_event_processing.py
@@ -21,6 +21,8 @@ import time
 import requests
 
 from tests.common.environ import build_flavor_timeout
+from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, \
+    SkipIfLocal, SkipIfHive2
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, SkipIfLocal
 from tests.util.hive_utils import HiveDbWrapper
@@ -39,7 +41,20 @@ class TestEventProcessing(CustomClusterTestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=2")
+  @SkipIfHive2.acid
+  def test_insert_events_transactional(self):
+    """Executes 'run_test_insert_events' for transactional tables.
+    """
+    self.run_test_insert_events(is_transactional=True)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=2")
   def test_insert_events(self):
+    """Executes 'run_test_insert_events' for non-transactional tables.
+    """
+    self.run_test_insert_events()
+
+  def run_test_insert_events(self, is_transactional=False):
     """Test for insert event processing. Events are created in Hive and processed in
     Impala. The following cases are tested :
     Insert into table --> for partitioned and non-partitioned table
@@ -50,9 +65,14 @@ class TestEventProcessing(CustomClusterTestSuite):
     with HiveDbWrapper(self, db_name):
      # Test table with no partitions.
      TBL_INSERT_NOPART = 'tbl_insert_nopart'
+     self.run_stmt_in_hive("drop table if exists %s.%s" % (db_name, TBL_INSERT_NOPART))
      last_synced_event_id = self.get_last_synced_event_id()
-     self.run_stmt_in_hive("create table %s.%s (id int, val int)"
-         % (db_name, TBL_INSERT_NOPART))
+     TBLPROPERTIES = ""
+     if is_transactional:
+       TBLPROPERTIES = "TBLPROPERTIES ('transactional'='true'," \
+           "'transactional_properties'='insert_only')"
+     self.run_stmt_in_hive("create table %s.%s (id int, val int) %s"
+         % (db_name, TBL_INSERT_NOPART, TBLPROPERTIES))
      # Test insert into table, this will fire an insert event.
      self.run_stmt_in_hive("insert into %s.%s values(101, 200)"
          % (db_name, TBL_INSERT_NOPART))
@@ -76,8 +96,10 @@ class TestEventProcessing(CustomClusterTestSuite):
      # Test partitioned table.
      last_synced_event_id = self.get_last_synced_event_id()
      TBL_INSERT_PART = 'tbl_insert_part'
+     self.run_stmt_in_hive("drop table if exists %s.%s" % (db_name, TBL_INSERT_PART))
      self.run_stmt_in_hive("create table %s.%s (id int, name string) "
-         "partitioned by(day int, month int, year int)" % (db_name, TBL_INSERT_PART))
+         "partitioned by(day int, month int, year int) %s"
+         % (db_name, TBL_INSERT_PART, TBLPROPERTIES))
      # Insert data into partitions.
      self.run_stmt_in_hive("insert into %s.%s partition(day=28, month=03, year=2019)"
          "values(101, 'x')" % (db_name, TBL_INSERT_PART))