You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by cs...@apache.org on 2019/08/14 09:59:36 UTC

[impala] branch master updated (fd86672 -> cef76db)

This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from fd86672  IMPALA-8829: [DOCS] Document limitation of parsing "TB" string
     new 1908e44  IMPALA-4551: Limit the size of SQL statements
     new e0c0fe9  IMPALA-8791: Handle the case where there is no fragment scheduled on the coordinator
     new cef76db  IMPALA-8854: skip test_acid_insert

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/runtime/coordinator-backend-state.cc        |  14 +-
 be/src/runtime/coordinator-backend-state.h         |   8 +-
 be/src/runtime/coordinator.cc                      |  12 +-
 be/src/runtime/coordinator.h                       |   4 +-
 be/src/scheduling/admission-controller-test.cc     |  33 +++--
 be/src/scheduling/admission-controller.cc          |  61 ++++----
 be/src/scheduling/query-schedule.cc                |  57 ++++----
 be/src/scheduling/query-schedule.h                 |  43 +++---
 be/src/scheduling/scheduler.cc                     |  15 +-
 be/src/scheduling/scheduler.h                      |   5 +-
 be/src/service/impala-server.cc                    |   8 ++
 be/src/service/query-options-test.cc               |   7 +-
 be/src/service/query-options.cc                    |  26 ++++
 be/src/service/query-options.h                     |  17 ++-
 common/thrift/ImpalaInternalService.thrift         |  12 ++
 common/thrift/ImpalaService.thrift                 |  14 ++
 common/thrift/generate_error_codes.py              |   3 +
 .../apache/impala/analysis/AnalysisContext.java    |   8 ++
 .../java/org/apache/impala/analysis/Analyzer.java  |  17 +++
 .../main/java/org/apache/impala/analysis/Expr.java |  23 +++
 .../java/org/apache/impala/planner/Planner.java    |  16 ++-
 .../apache/impala/analysis/AnalyzeExprsTest.java   | 159 +++++++++++++++++++++
 .../QueryTest/dedicated-coord-mem-estimates.test   |   8 +-
 tests/common/impala_connection.py                  |  31 +++-
 tests/custom_cluster/test_admission_controller.py  |  16 ++-
 tests/query_test/test_exprs.py                     |  67 ++++++++-
 tests/query_test/test_insert.py                    |   1 +
 27 files changed, 549 insertions(+), 136 deletions(-)


[impala] 03/03: IMPALA-8854: skip test_acid_insert

Posted by cs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit cef76db392bfe83507ff703b054a73e811358860
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Tue Aug 13 14:09:18 2019 -0700

    IMPALA-8854: skip test_acid_insert
    
    The test is failing because of a Hive version change in some
    configurations. Disabling for now until it can be fixed.
    
    Change-Id: I3bc5cce8b9c3843b5bb8ac4d29b2219411f671b4
    Reviewed-on: http://gerrit.cloudera.org:8080/14056
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Zoltan Borok-Nagy <bo...@cloudera.com>
---
 tests/query_test/test_insert.py | 1 +
 1 file changed, 1 insertion(+)

diff --git a/tests/query_test/test_insert.py b/tests/query_test/test_insert.py
index be298a7..a630b3e 100644
--- a/tests/query_test/test_insert.py
+++ b/tests/query_test/test_insert.py
@@ -139,6 +139,7 @@ class TestInsertQueries(ImpalaTestSuite):
   @pytest.mark.execute_serially
   @SkipIfHive2.acid
   def test_acid_insert(self, vector):
+    pytest.skip("IMPALA-8854: skipping to unbreak builds")
     if (vector.get_value('table_format').file_format == 'parquet'):
       vector.get_value('exec_option')['COMPRESSION_CODEC'] = \
           vector.get_value('compression_codec')


[impala] 02/03: IMPALA-8791: Handle the case where there is no fragment scheduled on the coordinator

Posted by cs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit e0c0fe988ac7c0c4b274c472ec544965d19c6184
Author: Bikramjeet Vig <bi...@cloudera.com>
AuthorDate: Fri Aug 2 09:45:51 2019 -0700

    IMPALA-8791: Handle the case where there is no fragment scheduled on
    the coordinator
    
    This patch fixes a bug where if an insert or CTAS query has no
    fragments scheduled on the coordinator and a mem limit is to be
    enforced on the query (either through query option or automatically
    through estimates) then the same limit is also applied to the
    coordinator backend even though it does not execute anything.
    
    Highlights:
    - coord_backend_mem_to_admit_/mem_limit will always refer to the memory
    to admit/limit for the coordinator regardless of which fragments are
    scheduled on it.
    
    - There will always be a BackendExecParams added for the coordinator
    because coordinator always spawns a QueryState object with a mem_tracker
    for tracking runtime filter mem and the result set cache. For the case
    where this BackendExecParams is empty (no instances scheduled) it would
    ensure that some minimal amount of memory is accounted for by the
    admission controller and the right mem limit is applied to the
    QueryState spawned by the coordinator
    
    - added changes to Coordinator and Coordinator::BackendState classes
    to handle an empty BackendExecParams object
    
    Testing:
    The following cases need to be tested where the kind of fragments
    schduled on the coordinator backend are:
    1. Coordinator fragment + other exec fragments
    2. Coordinator fragment only
    3. other exec fragments only (eg. insert into values OR insert
       into select 1)
    4. No fragments, but coordinator still creates a QueryState
    
    Case 1 is covered by tests working with non-dedicated coordinators.
    Rest are covered by test_dedicated_coordinator_planner_estimates and
    test_sanity_checks_dedicated_coordinator in
    test_admission_controller.py
    
    Change-Id: Ic4fba02bb7b20553a20634f8c5989d43ba2c0721
    Reviewed-on: http://gerrit.cloudera.org:8080/14058
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/coordinator-backend-state.cc        | 14 ++++-
 be/src/runtime/coordinator-backend-state.h         |  8 ++-
 be/src/runtime/coordinator.cc                      | 12 ++---
 be/src/runtime/coordinator.h                       |  4 +-
 be/src/scheduling/admission-controller-test.cc     | 33 ++++++++----
 be/src/scheduling/admission-controller.cc          | 61 ++++++++++------------
 be/src/scheduling/query-schedule.cc                | 57 +++++++++++---------
 be/src/scheduling/query-schedule.h                 | 43 +++++++--------
 be/src/scheduling/scheduler.cc                     | 15 +++---
 be/src/scheduling/scheduler.h                      |  5 +-
 .../java/org/apache/impala/planner/Planner.java    | 16 +++---
 .../QueryTest/dedicated-coord-mem-estimates.test   |  8 +--
 tests/custom_cluster/test_admission_controller.py  | 16 +++++-
 13 files changed, 167 insertions(+), 125 deletions(-)

diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 028dd70..f3d81a9 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -69,8 +69,8 @@ void Coordinator::BackendState::Init(
     const BackendExecParams& exec_params, const vector<FragmentStats*>& fragment_stats,
     RuntimeProfile* host_profile_parent, ObjectPool* obj_pool) {
   backend_exec_params_ = &exec_params;
-  host_ = backend_exec_params_->instance_params[0]->host;
-  krpc_host_ = backend_exec_params_->instance_params[0]->krpc_host;
+  host_ = backend_exec_params_->be_desc.address;
+  krpc_host_ = backend_exec_params_->be_desc.krpc_address;
   num_remaining_instances_ = backend_exec_params_->instance_params.size();
 
   host_profile_ = RuntimeProfile::Create(obj_pool, TNetworkAddressToString(host_));
@@ -187,6 +187,14 @@ void Coordinator::BackendState::Exec(
     last_report_time_ms_ = GenerateReportTimestamp();
     exec_complete_barrier->Notify();
   });
+
+  // Do not issue an ExecQueryFInstances RPC if there are no fragment instances
+  // scheduled to run on this backend.
+  if (IsEmptyBackend()) {
+    DCHECK(backend_exec_params_->is_coord_backend);
+    return;
+  }
+
   std::unique_ptr<ControlServiceProxy> proxy;
   Status get_proxy_status = ControlService::GetProxy(krpc_host_, host_.hostname, &proxy);
   if (!get_proxy_status.ok()) {
@@ -349,6 +357,7 @@ bool Coordinator::BackendState::ApplyExecStatusReport(
     const ReportExecStatusRequestPB& backend_exec_status,
     const TRuntimeProfileForest& thrift_profiles, ExecSummary* exec_summary,
     ProgressUpdater* scan_range_progress, DmlExecState* dml_exec_state) {
+  DCHECK(!IsEmptyBackend());
   // Hold the exec_summary's lock to avoid exposing it half-way through
   // the update loop below.
   lock_guard<SpinLock> l1(exec_summary->lock);
@@ -453,6 +462,7 @@ bool Coordinator::BackendState::ApplyExecStatusReport(
 
 void Coordinator::BackendState::UpdateHostProfile(
     const TRuntimeProfileTree& thrift_profile) {
+  DCHECK(!IsEmptyBackend());
   host_profile_->Update(thrift_profile);
 }
 
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index 4c41c8c..4b0cfad 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -52,7 +52,9 @@ class ExecSummary;
 struct FInstanceExecParams;
 
 /// This class manages all aspects of the execution of all fragment instances of a
-/// single query on a particular backend.
+/// single query on a particular backend. For the coordinator backend its possible to have
+/// no fragment instances scheduled on it. In that case, no RPCs are issued and the
+/// Backend state transitions to 'done' state right after Exec() is called on it.
 /// Thread-safe unless pointed out otherwise.
 class Coordinator::BackendState {
  public:
@@ -72,6 +74,7 @@ class Coordinator::BackendState {
   /// notifies on rpc_complete_barrier when the rpc completes. Success/failure is
   /// communicated through GetStatus(). Uses filter_routing_table to remove filters
   /// that weren't selected during its construction.
+  /// No RPC is issued if there are no fragment instances scheduled on this backend.
   /// The debug_options are applied to the appropriate TPlanFragmentInstanceCtxs, based
   /// on their node_id/instance_idx.
   void Exec(const DebugOptions& debug_options,
@@ -159,6 +162,9 @@ class Coordinator::BackendState {
   /// Returns a timestamp using monotonic time for tracking arrival of status reports.
   static int64_t GenerateReportTimestamp() { return MonotonicMillis(); }
 
+  /// Returns True if there are no fragment instances scheduled on this backend.
+  bool IsEmptyBackend() { return backend_exec_params_->instance_params.empty(); }
+
  private:
   /// Execution stats for a single fragment instance.
   /// Not thread-safe.
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index bd4fb75..4df15d9 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -111,14 +111,8 @@ Status Coordinator::Exec() {
   const string& str = Substitute("Query $0", PrintId(query_id()));
   progress_.Init(str, schedule_.num_scan_ranges());
 
-  // If there is no coord fragment then pick the mem limit computed for an executor.
-  // TODO: IMPALA-8791: make sure minimal or no limit is imposed for cases where no
-  // fragments are scheduled to run on the coordinator backend.
-  int64_t coord_mem_limit = schedule_.requiresCoordinatorFragment() ?
-      schedule_.coord_backend_mem_limit() :
-      schedule_.per_backend_mem_limit();
   query_state_ = ExecEnv::GetInstance()->query_exec_mgr()->CreateQueryState(
-      query_ctx(), coord_mem_limit);
+      query_ctx(), schedule_.coord_backend_mem_limit());
   filter_mem_tracker_ = query_state_->obj_pool()->Add(new MemTracker(
       -1, "Runtime Filter (Coordinator)", query_state_->query_mem_tracker(), false));
 
@@ -411,6 +405,10 @@ Status Coordinator::FinishBackendStartup() {
       max_latency_host = TNetworkAddressToString(backend_state->impalad_address());
     }
     latencies.Update(backend_state->rpc_latency());
+    // Mark backend complete if no fragment instances were assigned to it.
+    if (backend_state->IsEmptyBackend()) {
+      backend_exec_complete_barrier_->Notify();
+    }
   }
   query_profile_->AddInfoString(
       "Backend startup latencies", latencies.ToHumanReadable());
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 9098dd0..72ab3af 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -482,7 +482,9 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
 
   /// Helper for Exec(). Checks for errors encountered when starting backend execution,
   /// using any non-OK status, if any, as the overall status. Returns the overall
-  /// status. Also updates query_profile_ with the startup latency histogram.
+  /// status. Also updates query_profile_ with the startup latency histogram and the
+  /// backend_exec_complete_barrier_ if there is any backend which is already done (only
+  /// possible at this point if no fragment instances were assigned to it).
   Status FinishBackendStartup() WARN_UNUSED_RESULT;
 
   /// Build the filter routing table by iterating over all plan nodes and collecting the
diff --git a/be/src/scheduling/admission-controller-test.cc b/be/src/scheduling/admission-controller-test.cc
index a241525..17c8457 100644
--- a/be/src/scheduling/admission-controller-test.cc
+++ b/be/src/scheduling/admission-controller-test.cc
@@ -97,11 +97,10 @@ class AdmissionControllerTest : public testing::Test {
     TQueryOptions* query_options = pool_.Add(new TQueryOptions());
     query_options->__set_mem_limit(mem_limit);
     QuerySchedule* query_schedule = pool_.Add(new QuerySchedule(
-        *query_id, *request, *query_options, is_dedicated_coord, profile));
+        *query_id, *request, *query_options, profile));
     query_schedule->set_executor_group(ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME);
+    SetHostsInQuerySchedule(*query_schedule, num_hosts, is_dedicated_coord);
     query_schedule->UpdateMemoryRequirements(config);
-
-    SetHostsInQuerySchedule(*query_schedule, num_hosts);
     return query_schedule;
   }
 
@@ -113,14 +112,24 @@ class AdmissionControllerTest : public testing::Test {
   }
 
   /// Replace the per-backend hosts in the schedule with one having 'count' hosts.
+  /// Note: no FInstanceExecParams are added so
+  /// QuerySchedule::UseDedicatedCoordEstimates() would consider this schedule as not
+  /// having anything scheduled on the backend which would result in always returning true
+  /// if a dedicated coordinator backend exists.
   void SetHostsInQuerySchedule(QuerySchedule& query_schedule, const int count,
-      int64_t min_mem_reservation_bytes = 0, int64_t admit_mem_limit = 200L * MEGABYTE) {
+      bool is_dedicated_coord, int64_t min_mem_reservation_bytes = 0,
+      int64_t admit_mem_limit = 200L * MEGABYTE) {
     PerBackendExecParams* per_backend_exec_params = pool_.Add(new PerBackendExecParams());
     for (int i = 0; i < count; ++i) {
       BackendExecParams* backend_exec_params = pool_.Add(new BackendExecParams());
-      backend_exec_params->admit_mem_limit = admit_mem_limit;
       backend_exec_params->min_mem_reservation_bytes = min_mem_reservation_bytes;
-      if (i == 0) backend_exec_params->contains_coord_fragment = true;
+      backend_exec_params->be_desc.__set_admit_mem_limit(admit_mem_limit);
+      backend_exec_params->be_desc.__set_is_executor(true);
+      if (i == 0) {
+        // Add first element as the coordinator.
+        backend_exec_params->is_coord_backend = true;
+        backend_exec_params->be_desc.__set_is_executor(!is_dedicated_coord);
+      }
       const string host_name = Substitute("host$0", i);
       per_backend_exec_params->emplace(
           MakeNetworkAddress(host_name, 25000), *backend_exec_params);
@@ -479,7 +488,7 @@ TEST_F(AdmissionControllerTest, QueryRejection) {
 
   // Adjust the QuerySchedule to have minimum memory reservation of 45MB.
   // This will be rejected immediately as minimum memory reservation is too high.
-  SetHostsInQuerySchedule(*query_schedule, host_count, 45L * MEGABYTE);
+  SetHostsInQuerySchedule(*query_schedule, host_count, false, 45L * MEGABYTE);
   string rejected_reserved_reason;
   ASSERT_TRUE(admission_controller->RejectForSchedule(
       *query_schedule, config_d, host_count, host_count, &rejected_reserved_reason));
@@ -799,17 +808,19 @@ TEST_F(AdmissionControllerTest, DedicatedCoordAdmissionChecks) {
   PerBackendExecParams* per_backend_exec_params = pool_.Add(new PerBackendExecParams());
   // Add coordinator backend.
   BackendExecParams* coord_exec_params = pool_.Add(new BackendExecParams());
-  coord_exec_params->admit_mem_limit = 512 * MEGABYTE;
-  coord_exec_params->contains_coord_fragment = true;
+  coord_exec_params->is_coord_backend = true;
   coord_exec_params->thread_reservation = 1;
+  coord_exec_params->be_desc.__set_admit_mem_limit(512 * MEGABYTE);
+  coord_exec_params->be_desc.__set_is_executor(false);
   const string coord_host_name = Substitute("host$0", 1);
   TNetworkAddress coord_addr = MakeNetworkAddress(coord_host_name, 25000);
   const string coord_host = TNetworkAddressToString(coord_addr);
   per_backend_exec_params->emplace(coord_addr, *coord_exec_params);
   // Add executor backend.
   BackendExecParams* backend_exec_params = pool_.Add(new BackendExecParams());
-  backend_exec_params->admit_mem_limit = GIGABYTE;
   backend_exec_params->thread_reservation = 1;
+  backend_exec_params->be_desc.__set_admit_mem_limit(GIGABYTE);
+  backend_exec_params->be_desc.__set_is_executor(true);
   const string exec_host_name = Substitute("host$0", 2);
   TNetworkAddress exec_addr = MakeNetworkAddress(exec_host_name, 25000);
   const string exec_host = TNetworkAddressToString(exec_addr);
@@ -853,7 +864,7 @@ TEST_F(AdmissionControllerTest, DedicatedCoordAdmissionChecks) {
   // Test 2: coord's admit_mem_limit < executor's admit_mem_limit. Query rejected because
   // coord's admit_mem_limit is less than mem_to_admit on the coord.
   // Re-using previous QuerySchedule object.
-  coord_exec_params->admit_mem_limit = 100 * MEGABYTE;
+  coord_exec_params->be_desc.__set_admit_mem_limit(100 * MEGABYTE);
   (*per_backend_exec_params)[coord_addr] = *coord_exec_params;
   query_schedule->set_per_backend_exec_params(*per_backend_exec_params);
   ASSERT_TRUE(admission_controller->RejectForSchedule(
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index d31c50b..666659b 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -56,8 +56,6 @@ DEFINE_int64_hidden(admission_control_stale_topic_threshold_ms, 5 * 1000,
     "capture most cases where the Impala daemon is disconnected from the statestore "
     "or topic updates are seriously delayed.");
 
-DECLARE_bool(is_executor);
-
 namespace impala {
 
 const int64_t AdmissionController::PoolStats::HISTOGRAM_NUM_OF_BINS = 128;
@@ -398,7 +396,7 @@ void AdmissionController::UpdateHostStats(
   int64_t per_backend_mem_to_admit = schedule.per_backend_mem_to_admit();
   for (const auto& entry : schedule.per_backend_exec_params()) {
     const TNetworkAddress& host_addr = entry.first;
-    int64_t mem_to_admit = entry.second.contains_coord_fragment ?
+    int64_t mem_to_admit = entry.second.is_coord_backend ?
         schedule.coord_backend_mem_to_admit() : per_backend_mem_to_admit;
     if (!is_admitting) mem_to_admit *= -1;
     const string host = TNetworkAddressToString(host_addr);
@@ -441,9 +439,8 @@ bool AdmissionController::CanAccommodateMaxInitialReservation(
   const int64_t coord_min_reservation = schedule.coord_min_reservation();
   return CanMemLimitAccommodateReservation(
              executor_mem_limit, executor_min_reservation, mem_unavailable_reason)
-      && (!schedule.requiresCoordinatorFragment()
-             || CanMemLimitAccommodateReservation(
-                    coord_mem_limit, coord_min_reservation, mem_unavailable_reason));
+      && CanMemLimitAccommodateReservation(
+             coord_mem_limit, coord_min_reservation, mem_unavailable_reason);
 }
 
 bool AdmissionController::HasAvailableMemResources(const QuerySchedule& schedule,
@@ -483,11 +480,11 @@ bool AdmissionController::HasAvailableMemResources(const QuerySchedule& schedule
   for (const auto& entry : schedule.per_backend_exec_params()) {
     const TNetworkAddress& host = entry.first;
     const string host_id = TNetworkAddressToString(host);
-    int64_t admit_mem_limit = entry.second.admit_mem_limit;
+    int64_t admit_mem_limit = entry.second.be_desc.admit_mem_limit;
     const HostStats& host_stats = host_stats_[host_id];
     int64_t mem_reserved = host_stats.mem_reserved;
     int64_t mem_admitted = host_stats.mem_admitted;
-    int64_t mem_to_admit = entry.second.contains_coord_fragment ?
+    int64_t mem_to_admit = entry.second.is_coord_backend ?
         coord_mem_to_admit : executor_mem_to_admit;
     VLOG_ROW << "Checking memory on host=" << host_id
              << " mem_reserved=" << PrintBytes(mem_reserved)
@@ -519,7 +516,7 @@ bool AdmissionController::HasAvailableSlot(const QuerySchedule& schedule,
   for (const auto& entry : schedule.per_backend_exec_params()) {
     const TNetworkAddress& host = entry.first;
     const string host_id = TNetworkAddressToString(host);
-    int64_t admit_num_queries_limit = entry.second.admit_num_queries_limit;
+    int64_t admit_num_queries_limit = entry.second.be_desc.admit_num_queries_limit;
     int64_t num_admitted = host_stats_[host_id].num_admitted;
     VLOG_ROW << "Checking available slot on host=" << host_id
              << " num_admitted=" << num_admitted << " needs=" << num_admitted + 1
@@ -640,21 +637,21 @@ bool AdmissionController::RejectForSchedule(const QuerySchedule& schedule,
       nullptr, std::numeric_limits<int64_t>::max());
   int64_t cluster_thread_reservation = 0;
   for (const auto& e : schedule.per_backend_exec_params()) {
-    cluster_min_mem_reservation_bytes += e.second.min_mem_reservation_bytes;
-    if (e.second.min_mem_reservation_bytes > largest_min_mem_reservation.second) {
-      largest_min_mem_reservation =
-          make_pair(&e.first, e.second.min_mem_reservation_bytes);
+    const BackendExecParams& bp = e.second;
+    cluster_min_mem_reservation_bytes += bp.min_mem_reservation_bytes;
+    if (bp.min_mem_reservation_bytes > largest_min_mem_reservation.second) {
+      largest_min_mem_reservation = make_pair(&e.first, bp.min_mem_reservation_bytes);
     }
-    cluster_thread_reservation += e.second.thread_reservation;
-    if (e.second.thread_reservation > max_thread_reservation.second) {
-      max_thread_reservation = make_pair(&e.first, e.second.thread_reservation);
+    cluster_thread_reservation += bp.thread_reservation;
+    if (bp.thread_reservation > max_thread_reservation.second) {
+      max_thread_reservation = make_pair(&e.first, bp.thread_reservation);
     }
-    if (e.second.contains_coord_fragment) {
+    if (bp.is_coord_backend) {
       coord_admit_mem_limit.first = &e.first;
-      coord_admit_mem_limit.second = e.second.admit_mem_limit;
-    } else if (e.second.admit_mem_limit < min_executor_admit_mem_limit.second) {
+      coord_admit_mem_limit.second = bp.be_desc.admit_mem_limit;
+    } else if (bp.be_desc.admit_mem_limit < min_executor_admit_mem_limit.second) {
       min_executor_admit_mem_limit.first = &e.first;
-      min_executor_admit_mem_limit.second = e.second.admit_mem_limit;
+      min_executor_admit_mem_limit.second = bp.be_desc.admit_mem_limit;
     }
   }
 
@@ -724,17 +721,15 @@ bool AdmissionController::RejectForSchedule(const QuerySchedule& schedule,
               TNetworkAddressToString(*min_executor_admit_mem_limit.first));
       return true;
     }
-    if (schedule.requiresCoordinatorFragment()) {
-      int64_t coord_mem_to_admit = schedule.coord_backend_mem_to_admit();
-      VLOG_ROW << "Checking coordinator mem with coord_mem_to_admit = "
-               << coord_mem_to_admit
-               << " and coord_admit_mem_limit.second = " << coord_admit_mem_limit.second;
-      if (coord_mem_to_admit > coord_admit_mem_limit.second) {
-        *rejection_reason = Substitute(REASON_REQ_OVER_NODE_MEM,
-            PrintBytes(coord_mem_to_admit), PrintBytes(coord_admit_mem_limit.second),
-            TNetworkAddressToString(*coord_admit_mem_limit.first));
-        return true;
-      }
+    int64_t coord_mem_to_admit = schedule.coord_backend_mem_to_admit();
+    VLOG_ROW << "Checking coordinator mem with coord_mem_to_admit = "
+             << coord_mem_to_admit
+             << " and coord_admit_mem_limit.second = " << coord_admit_mem_limit.second;
+    if (coord_mem_to_admit > coord_admit_mem_limit.second) {
+      *rejection_reason = Substitute(REASON_REQ_OVER_NODE_MEM,
+          PrintBytes(coord_mem_to_admit), PrintBytes(coord_admit_mem_limit.second),
+          TNetworkAddressToString(*coord_admit_mem_limit.first));
+      return true;
     }
   }
   return false;
@@ -1148,11 +1143,9 @@ Status AdmissionController::ComputeGroupSchedules(
   for (const ExecutorGroup* executor_group : executor_groups) {
     DCHECK(executor_group->IsHealthy());
     DCHECK_GT(executor_group->NumExecutors(), 0);
-    bool is_dedicated_coord = !FLAGS_is_executor;
     unique_ptr<QuerySchedule> group_schedule =
         make_unique<QuerySchedule>(request.query_id, request.request,
-            request.query_options, is_dedicated_coord, request.summary_profile,
-            request.query_events);
+            request.query_options, request.summary_profile, request.query_events);
     const string& group_name = executor_group->name();
     VLOG(3) << "Scheduling for executor group: " << group_name << " with "
             << executor_group->NumExecutors() << " executors";
diff --git a/be/src/scheduling/query-schedule.cc b/be/src/scheduling/query-schedule.cc
index fceee52..86fe8fd 100644
--- a/be/src/scheduling/query-schedule.cc
+++ b/be/src/scheduling/query-schedule.cc
@@ -43,29 +43,26 @@ DEFINE_bool_hidden(use_dedicated_coordinator_estimates, true,
 namespace impala {
 
 QuerySchedule::QuerySchedule(const TUniqueId& query_id, const TQueryExecRequest& request,
-    const TQueryOptions& query_options, bool is_dedicated_coord,
-    RuntimeProfile* summary_profile, RuntimeProfile::EventSequence* query_events)
+    const TQueryOptions& query_options, RuntimeProfile* summary_profile,
+    RuntimeProfile::EventSequence* query_events)
   : query_id_(query_id),
     request_(request),
     query_options_(query_options),
     summary_profile_(summary_profile),
     query_events_(query_events),
     num_scan_ranges_(0),
-    next_instance_id_(query_id),
-    is_dedicated_coord_(is_dedicated_coord) {
+    next_instance_id_(query_id) {
   Init();
 }
 
 QuerySchedule::QuerySchedule(const TUniqueId& query_id, const TQueryExecRequest& request,
-    const TQueryOptions& query_options, bool is_dedicated_coord,
-    RuntimeProfile* summary_profile)
+    const TQueryOptions& query_options, RuntimeProfile* summary_profile)
   : query_id_(query_id),
     request_(request),
     query_options_(query_options),
     summary_profile_(summary_profile),
     num_scan_ranges_(0),
-    next_instance_id_(query_id),
-    is_dedicated_coord_(is_dedicated_coord) {
+    next_instance_id_(query_id) {
   // Init() is not called, this constructor is for white box testing only.
   DCHECK(TestInfo::is_test());
 }
@@ -89,7 +86,7 @@ void QuerySchedule::Init() {
 
   // mark coordinator fragment
   const TPlanFragment& root_fragment = request_.plan_exec_info[0].fragments[0];
-  if (requiresCoordinatorFragment()) {
+  if (RequiresCoordinatorFragment()) {
     fragment_exec_params_[root_fragment.idx].is_coord_fragment = true;
     // the coordinator instance gets index 0, generated instance ids start at 1
     next_instance_id_ = CreateInstanceId(next_instance_id_, 1);
@@ -182,7 +179,11 @@ void QuerySchedule::Validate() const {
       }
     }
   }
-  // TODO: add validation for BackendExecParams
+
+  for (const auto& elem: per_backend_exec_params_) {
+    const BackendExecParams& bp = elem.second;
+    DCHECK(!bp.instance_params.empty() || bp.is_coord_backend);
+  }
 }
 
 int64_t QuerySchedule::GetPerExecutorMemoryEstimate() const {
@@ -223,7 +224,7 @@ void QuerySchedule::GetTPlanFragments(vector<const TPlanFragment*>* fragments) c
 }
 
 const FInstanceExecParams& QuerySchedule::GetCoordInstanceExecParams() const {
-  DCHECK(requiresCoordinatorFragment());
+  DCHECK(RequiresCoordinatorFragment());
   const TPlanFragment& coord_fragment =  request_.plan_exec_info[0].fragments[0];
   const FragmentExecParams& fragment_params = fragment_exec_params_[coord_fragment.idx];
   DCHECK_EQ(fragment_params.instance_exec_params.size(), 1);
@@ -247,20 +248,25 @@ int QuerySchedule::GetNumFragmentInstances() const {
 }
 
 int64_t QuerySchedule::GetClusterMemoryToAdmit() const {
-  if (!requiresCoordinatorFragment()) {
-    // For this case, there will be no coordinator fragment so only use the per
-    // executor mem to admit while accounting for admitted memory. This will also ensure
-    // the per backend mem admitted accounting is consistent with the cluster-wide mem
-    // admitted.
-    return per_backend_mem_to_admit() * per_backend_exec_params_.size();
-  } else {
-    return per_backend_mem_to_admit() * (per_backend_exec_params_.size() - 1)
-        + coord_backend_mem_to_admit();
-  }
+  // There will always be an entry for the coordinator in per_backend_exec_params_.
+  return per_backend_mem_to_admit() * (per_backend_exec_params_.size() - 1)
+      + coord_backend_mem_to_admit();
 }
 
 bool QuerySchedule::UseDedicatedCoordEstimates() const {
-  return is_dedicated_coord_ && FLAGS_use_dedicated_coordinator_estimates;
+  for (auto& itr : per_backend_exec_params_) {
+    if (!itr.second.is_coord_backend) continue;
+    auto& coord = itr.second;
+    bool is_dedicated_coord = !coord.be_desc.is_executor;
+    bool only_coord_fragment_scheduled =
+        RequiresCoordinatorFragment() && coord.instance_params.size() == 1;
+    bool no_fragment_scheduled = coord.instance_params.size() == 0;
+    return FLAGS_use_dedicated_coordinator_estimates && is_dedicated_coord
+        && (only_coord_fragment_scheduled || no_fragment_scheduled);
+  }
+  DCHECK(false)
+      << "Coordinator backend should always have a entry in per_backend_exec_params_";
+  return false;
 }
 
 void QuerySchedule::UpdateMemoryRequirements(const TPoolConfig& pool_cfg) {
@@ -269,6 +275,7 @@ void QuerySchedule::UpdateMemoryRequirements(const TPoolConfig& pool_cfg) {
   // mem_limit if it is set in the query options, else sets it to -1 (no limit).
   bool mimic_old_behaviour =
       pool_cfg.min_query_mem_limit == 0 && pool_cfg.max_query_mem_limit == 0;
+  bool use_dedicated_coord_estimates = UseDedicatedCoordEstimates();
 
   per_backend_mem_to_admit_ = 0;
   coord_backend_mem_to_admit_ = 0;
@@ -281,7 +288,7 @@ void QuerySchedule::UpdateMemoryRequirements(const TPoolConfig& pool_cfg) {
 
   if (!has_query_option) {
     per_backend_mem_to_admit_ = GetPerExecutorMemoryEstimate();
-    coord_backend_mem_to_admit_ = UseDedicatedCoordEstimates() ?
+    coord_backend_mem_to_admit_ = use_dedicated_coord_estimates ?
         GetDedicatedCoordMemoryEstimate() :
         GetPerExecutorMemoryEstimate();
     if (!mimic_old_behaviour) {
@@ -299,7 +306,7 @@ void QuerySchedule::UpdateMemoryRequirements(const TPoolConfig& pool_cfg) {
     if (pool_cfg.min_query_mem_limit > 0) {
       per_backend_mem_to_admit_ =
           max(per_backend_mem_to_admit_, pool_cfg.min_query_mem_limit);
-      if (!UseDedicatedCoordEstimates() || has_query_option) {
+      if (!use_dedicated_coord_estimates || has_query_option) {
         // The minimum mem limit option does not apply to dedicated coordinators -
         // this would result in over-reserving of memory. Treat coordinator and
         // executor mem limits the same if the query option was explicitly set.
@@ -321,7 +328,7 @@ void QuerySchedule::UpdateMemoryRequirements(const TPoolConfig& pool_cfg) {
   coord_backend_mem_to_admit_ = min(coord_backend_mem_to_admit_, MemInfo::physical_mem());
 
   // If the query is only scheduled to run on the coordinator.
-  if (per_backend_exec_params_.size() == 1 && requiresCoordinatorFragment()) {
+  if (per_backend_exec_params_.size() == 1 && RequiresCoordinatorFragment()) {
     per_backend_mem_to_admit_ = 0;
   }
 
diff --git a/be/src/scheduling/query-schedule.h b/be/src/scheduling/query-schedule.h
index ad169c7..6725774 100644
--- a/be/src/scheduling/query-schedule.h
+++ b/be/src/scheduling/query-schedule.h
@@ -45,7 +45,9 @@ typedef std::map<TPlanNodeId, std::vector<TScanRangeParams>> PerNodeScanRanges;
 typedef std::unordered_map<TNetworkAddress, PerNodeScanRanges>
     FragmentScanRangeAssignment;
 
-/// Execution parameters for a single backend. Computed by Scheduler::Schedule(), set
+/// Execution parameters for a single backend. This gets created for every backend that
+/// participates in query execution, which includes, every backend that has fragments
+/// scheduled on it and the coordinator backend. Computed by Scheduler::Schedule(), set
 /// via QuerySchedule::set_per_backend_exec_params(). Used as an input to
 /// AdmissionController and a BackendState.
 struct BackendExecParams {
@@ -54,6 +56,8 @@ struct BackendExecParams {
   /// The fragment instance params assigned to this backend. All instances of a
   /// particular fragment are contiguous in this vector. Query lifetime;
   /// FInstanceExecParams are owned by QuerySchedule::fragment_exec_params_.
+  /// This can be empty only for the coordinator backend, that is, if 'is_coord_backend'
+  /// is true.
   std::vector<const FInstanceExecParams*> instance_params;
 
   // The minimum query-wide buffer reservation size (in bytes) required for this backend.
@@ -74,16 +78,8 @@ struct BackendExecParams {
   // concurrently-executing fragment instances at any point in query execution.
   int64_t thread_reservation = 0;
 
-  // The maximum bytes of memory that can be admitted to this backend by the
-  // admission controller. Obtained from the scheduler's executors configuration
-  // which is updated by membership updates from the statestore.
-  int64_t admit_mem_limit = 0;
-
-  // The maximum number of queries that this backend can execute concurrently.
-  int64_t admit_num_queries_limit = 0;
-
-  // Indicates whether this backend will run the coordinator fragment.
-  bool contains_coord_fragment = false;
+  // Indicates whether this backend is the coordinator.
+  bool is_coord_backend = false;
 };
 
 /// Map from an impalad host address to the list of assigned fragment instance params.
@@ -162,18 +158,17 @@ struct FragmentExecParams {
 class QuerySchedule {
  public:
   QuerySchedule(const TUniqueId& query_id, const TQueryExecRequest& request,
-      const TQueryOptions& query_options, bool is_dedicated_coord,
-      RuntimeProfile* summary_profile,
+      const TQueryOptions& query_options, RuntimeProfile* summary_profile,
       RuntimeProfile::EventSequence* query_events);
 
   /// For testing only: build a QuerySchedule object but do not run Init().
   QuerySchedule(const TUniqueId& query_id, const TQueryExecRequest& request,
-      const TQueryOptions& query_options, bool is_dedicated_coord,
-      RuntimeProfile* summary_profile);
+      const TQueryOptions& query_options, RuntimeProfile* summary_profile);
 
   /// Verifies that the schedule is well-formed (and DCHECKs if it isn't):
   /// - all fragments have a FragmentExecParams
   /// - all scan ranges are assigned
+  /// - all BackendExecParams have instances assigned except for coordinator.
   void Validate() const;
 
   const TUniqueId& query_id() const { return query_id_; }
@@ -287,6 +282,11 @@ class QuerySchedule {
 
   /// Returns true if coordinator estimates calculated by the planner and specialized for
   /// dedicated a coordinator are to be used for estimating memory requirements.
+  /// This happens when the following conditions are true:
+  /// 1. Coordinator fragment is scheduled on a dedicated coordinator
+  /// 2. Either only the coordinator fragment or no fragments are scheduled on the
+  /// coordinator backend. This essentially means that no executor fragments are scheduled
+  /// on the coordinator backend.
   bool UseDedicatedCoordEstimates() const;
 
   /// Populates or updates the per host query memory limit and the amount of memory to be
@@ -299,11 +299,6 @@ class QuerySchedule {
 
   void set_executor_group(string executor_group);
 
-  /// Returns true if a coordinator fragment is required based on the query stmt type.
-  bool requiresCoordinatorFragment() const {
-    return request_.stmt_type == TStmtType::QUERY;
-  }
-
  private:
   /// These references are valid for the lifetime of this query schedule because they
   /// are all owned by the enclosing QueryExecState.
@@ -366,9 +361,6 @@ class QuerySchedule {
   /// The coordinator's backend memory reservation. Set in Scheduler::Schedule().
   int64_t coord_min_reservation_ = 0;
 
-  /// Indicates whether coordinator fragment will be running on a dedicated coordinator.
-  bool is_dedicated_coord_ = false;
-
   /// The name of the executor group that this schedule was computed for. Set by the
   /// Scheduler and only valid after scheduling completes successfully.
   string executor_group_;
@@ -377,6 +369,11 @@ class QuerySchedule {
   /// Sets is_coord_fragment and input_fragments.
   /// Also populates plan_node_to_fragment_idx_ and plan_node_to_plan_node_idx_.
   void Init();
+
+  /// Returns true if a coordinator fragment is required based on the query stmt type.
+  bool RequiresCoordinatorFragment() const {
+    return request_.stmt_type == TStmtType::QUERY;
+  }
 };
 }
 
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index b5e6807..467694b 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -673,21 +673,22 @@ void Scheduler::ComputeBackendExecParams(
       be_params.initial_mem_reservation_total_claims +=
           f.fragment.initial_mem_reservation_total_claims;
       be_params.thread_reservation += f.fragment.thread_reservation;
-      if (f.is_coord_fragment) be_params.contains_coord_fragment = true;
     }
   }
 
-  int64_t largest_min_reservation = 0;
+  // This also ensures an entry always exists for the coordinator backend.
   int64_t coord_min_reservation = 0;
+  const TNetworkAddress& coord_addr = executor_config.local_be_desc.address;
+  BackendExecParams& coord_be_params = per_backend_params[coord_addr];
+  coord_be_params.is_coord_backend = true;
+  coord_min_reservation = coord_be_params.min_mem_reservation_bytes;
+
+  int64_t largest_min_reservation = 0;
   for (auto& backend : per_backend_params) {
     const TNetworkAddress& host = backend.first;
     const TBackendDescriptor be_desc = LookUpBackendDesc(executor_config, host);
-    backend.second.admit_mem_limit = be_desc.admit_mem_limit;
-    backend.second.admit_num_queries_limit = be_desc.admit_num_queries_limit;
     backend.second.be_desc = be_desc;
-    if (backend.second.contains_coord_fragment) {
-      coord_min_reservation = backend.second.min_mem_reservation_bytes;
-    } else {
+    if (!backend.second.is_coord_backend) {
       largest_min_reservation =
           max(largest_min_reservation, backend.second.min_mem_reservation_bytes);
     }
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index 01ebb3f..34ac10d 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -343,8 +343,9 @@ class Scheduler {
       const TQueryOptions& query_options, RuntimeProfile::Counter* timer,
       FragmentScanRangeAssignment* assignment);
 
-  /// Computes BackendExecParams for all backends assigned in the query. Must be called
-  /// after ComputeFragmentExecParams().
+  /// Computes BackendExecParams for all backends assigned in the query and always one for
+  /// the coordinator backend since it participates in execution regardless. Must be
+  /// called after ComputeFragmentExecParams().
   void ComputeBackendExecParams(
       const ExecutorConfig& executor_config, QuerySchedule* schedule);
 
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index b9d1f30..dbcff87 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -441,12 +441,16 @@ public class Planner {
         maxPerHostPeakResources.getMinMemReservationBytes());
     request.setMax_per_host_thread_reservation(
         maxPerHostPeakResources.getThreadReservation());
-    // Assuming the root fragment will always run on the coordinator backend, which
-    // might not be true for queries that don't have a coordinator fragment
-    // (request.getStmt_type() != TStmtType.QUERY). TODO: Fix in IMPALA-8791.
-    request.setDedicated_coord_mem_estimate(MathUtil.saturatingAdd(rootFragment
-        .getResourceProfile().getMemEstimateBytes(), totalRuntimeFilterMemBytes +
-        DEDICATED_COORD_SAFETY_BUFFER_BYTES));
+    if (getAnalysisResult().isQueryStmt()) {
+      request.setDedicated_coord_mem_estimate(MathUtil.saturatingAdd(rootFragment
+          .getResourceProfile().getMemEstimateBytes(), totalRuntimeFilterMemBytes +
+          DEDICATED_COORD_SAFETY_BUFFER_BYTES));
+    } else {
+      // For queries that don't have a coordinator fragment, estimate a small
+      // amount of memory that the query state spwaned on the coordinator can use.
+      request.setDedicated_coord_mem_estimate(totalRuntimeFilterMemBytes +
+          DEDICATED_COORD_SAFETY_BUFFER_BYTES);
+    }
     if (LOG.isTraceEnabled()) {
       LOG.trace("Max per-host min reservation: " +
           maxPerHostPeakResources.getMinMemReservationBytes());
diff --git a/testdata/workloads/functional-query/queries/QueryTest/dedicated-coord-mem-estimates.test b/testdata/workloads/functional-query/queries/QueryTest/dedicated-coord-mem-estimates.test
index cd53dac..39b505f 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/dedicated-coord-mem-estimates.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/dedicated-coord-mem-estimates.test
@@ -4,8 +4,8 @@
 create table test as select id from functional.alltypes where id > 1
 ---- RUNTIME_PROFILE
 row_regex: .*Per-Host Resource Estimates: Memory=16MB.*
-row_regex: .*Dedicated Coordinator Resource Estimate: Memory=116MB.*
-row_regex: .*Cluster Memory Admitted: 32.00 MB.*
+row_regex: .*Dedicated Coordinator Resource Estimate: Memory=100MB.*
+row_regex: .*Cluster Memory Admitted: 132.00 MB.*
 ====
 ---- QUERY
 # Truncate table to run the following inserts.
@@ -24,8 +24,8 @@ row_regex: .*Cluster Memory Admitted: 10.00 MB.*
 insert into test select id from functional.alltypes where id > 3
 ---- RUNTIME_PROFILE
 row_regex: .*Per-Host Resource Estimates: Memory=16MB.*
-row_regex: .*Dedicated Coordinator Resource Estimate: Memory=116MB.*
-row_regex: .*Cluster Memory Admitted: 32.00 MB.*
+row_regex: .*Dedicated Coordinator Resource Estimate: Memory=100MB.*
+row_regex: .*Cluster Memory Admitted: 132.00 MB.*
 ====
 ---- QUERY
 # SELECT with merging exchange (i.e. order by).
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index a180503..f0fd4ac 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -515,8 +515,9 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
       fs_allocation_file="mem-limit-test-fair-scheduler.xml",
       llama_site_file="mem-limit-test-llama-site.xml"), num_exclusive_coordinators=1,
     cluster_size=2)
-  def test_sanity_checks_dedicated_coordinator(self, vector):
-    """Test for verifying targeted dedicated coordinator memory estimation behavior."""
+  def test_sanity_checks_dedicated_coordinator(self, vector, unique_database):
+    """Sanity tests for verifying targeted dedicated coordinator memory estimations and
+    behavior."""
     self.client.set_configuration_option('request_pool', "root.regularPool")
     ImpalaTestSuite.change_database(self.client, vector.get_value('table_format'))
     exec_options = vector.get_value('exec_option')
@@ -547,6 +548,17 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
       "mem_to_admit:" + str(mem_to_admit)
     self.client.close_query(handle)
 
+    # Make sure query execution works perfectly for a query that does not have any
+    # fragments schdeuled on the coordinator, but has runtime-filters that need to be
+    # aggregated at the coordinator.
+    exec_options = vector.get_value('exec_option')
+    exec_options['RUNTIME_FILTER_WAIT_TIME_MS'] = 30000
+    query = """CREATE TABLE {0}.temp_tbl AS SELECT STRAIGHT_JOIN o_orderkey
+    FROM tpch_parquet.lineitem INNER JOIN [SHUFFLE] tpch_parquet.orders
+    ON o_orderkey = l_orderkey GROUP BY 1""".format(unique_database)
+    result = self.execute_query_expect_success(self.client, query, exec_options)
+    assert "Runtime filters: All filters arrived" in result.runtime_profile
+
   def __verify_mem_accounting(self, vector, using_dedicated_coord_estimates):
     """Helper method used by test_dedicated_coordinator_*_mem_accounting that verifies
     the actual vs expected values for mem admitted and mem limit for both coord and


[impala] 01/03: IMPALA-4551: Limit the size of SQL statements

Posted by cs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 1908e44c3c9faac8c7bf09422ca4c5ec598ffd58
Author: Joe McDonnell <jo...@cloudera.com>
AuthorDate: Wed Jul 24 17:12:06 2019 -0700

    IMPALA-4551: Limit the size of SQL statements
    
    Various BI tools generate and run SQL. When used incorrectly or
    misconfigured, the tools can generate extremely large SQLs.
    Some of these SQL statements reach 10s of megabytes. Large SQL
    statements impose costs throughout execution, including
    statement rewrite logic in the frontend and codegen in the
    backend. The resource usage of these statements can impact
    the stability of the system or the ability to run other SQL
    statements.
    
    This implements two new query options that provide controls
    to reject large SQL statements.
     - The first, MAX_STATEMENT_LENGTH_BYTES is a cap on the
       total size of the SQL statement (in bytes). It is
       applied before any parsing or analysis. It uses a
       default value of 16MB.
     - The second, STATEMENT_EXPRESSION_LIMIT, is a limit on
       the total number of expressions in a statement or any
       views that it references. The limit is applied upon the
       first round of analysis, but it is not reapplied when
       statement rewrite rules are applied. Certain expressions
       such as literals in IN lists or VALUES clauses are not
       analyzed and do not count towards the limit. It uses
       a default value of 250,000.
    The two are complementary. Since enforcing the statement
    expression limit requires parsing and analyzing the
    statement, the MAX_STATEMENT_LENGTH_BYTES sets an upper
    bound on the size of statement that needs to be parsed
    and analyzed. Testing confirms that even statements
    approaching 16MB get through the first round of analysis
    within a few seconds and then are rejected.
    
    This also changes the logging in tests/common/impala_connection.py
    to limit the total SQL size that it will print to 128KB. This is
    prevents the JUnitXML (which includes this logging) from being too
    large. Existing tests do not run SQL larger than about 80KB, so
    this only applies to tests added in this change that run multi-MB
    SQLs to verify limits.
    
    Testing:
     - This adds frontend tests that verify the low level
       semantics about how expressions are counted and verifies
       that the expression limits are enforced.
     - This adds end-to-end tests that verify both the
       MAX_STATEMENT_LENGTH_BYTES and STATEMENT_EXPRESSION_LIMIT
       at their defaults values.
     - There is also an end-to-end test that runs in exhaustive
       mode that runs a SQL with close to 250,000 expressions.
    
    Change-Id: I5675fb4a08c1dc51ae5bcf467cbb969cc064602c
    Reviewed-on: http://gerrit.cloudera.org:8080/14012
    Reviewed-by: Joe McDonnell <jo...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/service/impala-server.cc                    |   8 ++
 be/src/service/query-options-test.cc               |   7 +-
 be/src/service/query-options.cc                    |  26 ++++
 be/src/service/query-options.h                     |  17 ++-
 common/thrift/ImpalaInternalService.thrift         |  12 ++
 common/thrift/ImpalaService.thrift                 |  14 ++
 common/thrift/generate_error_codes.py              |   3 +
 .../apache/impala/analysis/AnalysisContext.java    |   8 ++
 .../java/org/apache/impala/analysis/Analyzer.java  |  17 +++
 .../main/java/org/apache/impala/analysis/Expr.java |  23 +++
 .../apache/impala/analysis/AnalyzeExprsTest.java   | 159 +++++++++++++++++++++
 tests/common/impala_connection.py                  |  31 +++-
 tests/query_test/test_exprs.py                     |  67 ++++++++-
 13 files changed, 381 insertions(+), 11 deletions(-)

diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index d0966ba..9c4c732 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1106,6 +1106,14 @@ Status ImpalaServer::ExecuteInternal(
     }
 #endif
 
+    size_t statement_length = query_ctx.client_request.stmt.length();
+    int32_t max_statement_length =
+        query_ctx.client_request.query_options.max_statement_length_bytes;
+    if (max_statement_length > 0 && statement_length > max_statement_length) {
+      return Status(ErrorMsg(TErrorCode::MAX_STATEMENT_LENGTH_EXCEEDED,
+          statement_length, max_statement_length));
+    }
+
     RETURN_IF_ERROR((*request_state)->UpdateQueryStatus(
         exec_env_->frontend()->GetExecRequest(query_ctx, &result)));
 
diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc
index 79f50cc..715422d 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -161,7 +161,10 @@ TEST(QueryOptions, SetByteOptions) {
           {8 * 1024, RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE}},
       {MAKE_OPTIONDEF(runtime_bloom_filter_size),
           {RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE,
-              RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE}}};
+              RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE}},
+      {MAKE_OPTIONDEF(max_statement_length_bytes),
+          {MIN_MAX_STATEMENT_LENGTH_BYTES, I32_MAX}},
+  };
   TestByteCaseSet(options, case_set_i64);
   TestByteCaseSet(options, case_set_i32);
 }
@@ -239,6 +242,8 @@ TEST(QueryOptions, SetIntOptions) {
       {MAKE_OPTIONDEF(exec_time_limit_s),              {0, I32_MAX}},
       {MAKE_OPTIONDEF(thread_reservation_limit),       {-1, I32_MAX}},
       {MAKE_OPTIONDEF(thread_reservation_aggregate_limit), {-1, I32_MAX}},
+      {MAKE_OPTIONDEF(statement_expression_limit),
+          {MIN_STATEMENT_EXPRESSION_LIMIT, I32_MAX}},
   };
   for (const auto& test_case : case_set) {
     const OptionDef<int32_t>& option_def = test_case.first;
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index a84dd49..d48d0ec 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -815,6 +815,32 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_default_transactional_type(enum_type);
         break;
       }
+      case TImpalaQueryOptions::STATEMENT_EXPRESSION_LIMIT: {
+        StringParser::ParseResult result;
+        const int32_t statement_expression_limit =
+          StringParser::StringToInt<int32_t>(value.c_str(), value.length(), &result);
+        if (result != StringParser::PARSE_SUCCESS ||
+            statement_expression_limit < MIN_STATEMENT_EXPRESSION_LIMIT) {
+          return Status(Substitute("Invalid statement expression limit: $0 "
+              "Valid values are in [$1, $2]", value, MIN_STATEMENT_EXPRESSION_LIMIT,
+              std::numeric_limits<int32_t>::max()));
+        }
+        query_options->__set_statement_expression_limit(statement_expression_limit);
+        break;
+      }
+      case TImpalaQueryOptions::MAX_STATEMENT_LENGTH_BYTES: {
+        int64_t max_statement_length_bytes;
+        RETURN_IF_ERROR(ParseMemValue(value, "max statement length bytes",
+            &max_statement_length_bytes));
+        if (max_statement_length_bytes < MIN_MAX_STATEMENT_LENGTH_BYTES ||
+            max_statement_length_bytes > std::numeric_limits<int32_t>::max()) {
+          return Status(Substitute("Invalid maximum statement length: $0 "
+              "Valid values are in [$1, $2]", max_statement_length_bytes,
+              MIN_MAX_STATEMENT_LENGTH_BYTES, std::numeric_limits<int32_t>::max()));
+        }
+        query_options->__set_max_statement_length_bytes(max_statement_length_bytes);
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 6bb7153..ff408dd 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -47,7 +47,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::DEFAULT_TRANSACTIONAL_TYPE + 1);\
+      TImpalaQueryOptions::MAX_STATEMENT_LENGTH_BYTES + 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -174,12 +174,21 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
   QUERY_OPT_FN(spool_query_results, SPOOL_QUERY_RESULTS,\
       TQueryOptionLevel::DEVELOPMENT)\
   QUERY_OPT_FN(default_transactional_type, DEFAULT_TRANSACTIONAL_TYPE,\
-      TQueryOptionLevel::ADVANCED)
+      TQueryOptionLevel::ADVANCED)\
+  QUERY_OPT_FN(statement_expression_limit, STATEMENT_EXPRESSION_LIMIT,\
+      TQueryOptionLevel::REGULAR)\
+  QUERY_OPT_FN(max_statement_length_bytes, MAX_STATEMENT_LENGTH_BYTES,\
+      TQueryOptionLevel::REGULAR)
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.
-  static const int64_t SPILLABLE_BUFFER_LIMIT = 1LL << 40; // 1 TB
-  static const int64_t ROW_SIZE_LIMIT = 1LL << 40; // 1 TB
+static const int64_t SPILLABLE_BUFFER_LIMIT = 1LL << 40; // 1 TB
+static const int64_t ROW_SIZE_LIMIT = 1LL << 40; // 1 TB
+
+/// Limits on the query size are intended to be large. Prevent them from being set
+/// to small values (which can prevent clients from executing anything).
+static const int32_t MIN_STATEMENT_EXPRESSION_LIMIT = 1 << 10; // 1024
+static const int32_t MIN_MAX_STATEMENT_LENGTH_BYTES = 1 << 10; // 1 KB
 
 /// Converts a TQueryOptions struct into a map of key, value pairs.  Options that
 /// aren't set and lack defaults in common/thrift/ImpalaInternalService.thrift are
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 3e12895..be79822 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -367,6 +367,18 @@ struct TQueryOptions {
 
   // See comment in ImpalaService.thrift
   87: optional TTransactionalType default_transactional_type = TTransactionalType.NONE;
+
+  // See comment in ImpalaService.thrift.
+  // The default of 250,000 is set to a high value to avoid impacting existing users, but
+  // testing indicates a statement with this number of expressions can run.
+  88: optional i32 statement_expression_limit = 250000
+
+  // See comment in ImpalaService.thrift
+  // The default is set to 16MB. It is likely that a statement of this size would exceed
+  // the statement expression limit. Setting a limit on the total statement size avoids
+  // the cost of parsing and analyzing the statement, which is required to enforce the
+  // statement expression limit.
+  89: optional i32 max_statement_length_bytes = 16777216
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index ae03051..d5c57aa 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -415,6 +415,20 @@ enum TImpalaQueryOptions {
   // Speficies the default transactional type for new HDFS tables.
   // Valid values: none, insert_only
   DEFAULT_TRANSACTIONAL_TYPE = 86
+
+  // Limit on the total number of expressions in the statement. Statements that exceed
+  // the limit will get an error during analysis. This is intended to set an upper
+  // bound on the complexity of statements to avoid resource impacts such as excessive
+  // time in analysis or codegen. This is enforced only for the first pass of analysis
+  // before any rewrites are applied.
+  STATEMENT_EXPRESSION_LIMIT = 87
+
+  // Limit on the total length of a SQL statement. Statements that exceed the maximum
+  // length will get an error before parsing/analysis. This is complementary to the
+  // statement expression limit, because statements of a certain size are highly
+  // likely to violate the statement expression limit. Rejecting them early avoids
+  // the cost of parsing/analysis.
+  MAX_STATEMENT_LENGTH_BYTES = 88
 }
 
 // The summary of a DML statement.
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index bb45c3d..d9d4d68 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -430,6 +430,9 @@ error_codes = (
   ("LZ4_DECOMPRESS_SAFE_FAILED", 141, "LZ4: LZ4_decompress_safe failed"),
 
   ("LZ4_COMPRESS_DEFAULT_FAILED", 142, "LZ4: LZ4_compress_default failed"),
+
+  ("MAX_STATEMENT_LENGTH_EXCEEDED", 143, "Statement length of $0 bytes exceeds the "
+   "maximum statement length ($1 bytes)"),
 )
 
 import sys
diff --git a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
index d5312f9..29cb6f8 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
@@ -454,6 +454,14 @@ public class AnalysisContext {
     Preconditions.checkNotNull(analysisResult_.stmt_);
     analysisResult_.analyzer_ = createAnalyzer(stmtTableCache);
     analysisResult_.stmt_.analyze(analysisResult_.analyzer_);
+    // Enforce the statement expression limit at the end of analysis so that there is an
+    // accurate count of the total number of expressions. The first analyze() call is not
+    // very expensive (~seconds) even for large statements. The limit on the total length
+    // of the SQL statement (max_statement_length_bytes) provides an upper bound.
+    // It is important to enforce this before expression rewrites, because rewrites are
+    // expensive with large expression trees. For example, a SQL that takes a few seconds
+    // to analyze the first time may take 10 minutes for rewrites.
+    analysisResult_.analyzer_.checkStmtExprLimit();
     boolean isExplain = analysisResult_.isExplainStmt();
 
     // Apply expr and subquery rewrites.
diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index f0656ec..75c1fbe 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -455,6 +455,12 @@ public class Analyzer {
     // Expr rewriter for normalizing and rewriting expressions.
     private final ExprRewriter exprRewriter_;
 
+    // Total number of expressions across the statement (including all subqueries). This
+    // is used to enforce a limit on the total number of expressions. Incremented by
+    // incrementNumStmtExprs(). Note that this does not include expressions that do not
+    // require analysis (e.g. some literal expressions).
+    private int numStmtExprs_ = 0;
+
     public GlobalState(StmtTableCache stmtTableCache, TQueryCtx queryCtx,
         AuthorizationFactory authzFactory) {
       this.stmtTableCache = stmtTableCache;
@@ -2847,6 +2853,17 @@ public class Analyzer {
   public int decrementCallDepth() { return --callDepth_; }
   public int getCallDepth() { return callDepth_; }
 
+  public int incrementNumStmtExprs() { return globalState_.numStmtExprs_++; }
+  public int getNumStmtExprs() { return globalState_.numStmtExprs_; }
+  public void checkStmtExprLimit() throws AnalysisException {
+    int statementExpressionLimit = getQueryOptions().getStatement_expression_limit();
+    if (getNumStmtExprs() > statementExpressionLimit) {
+      String errorStr = String.format("Exceeded the statement expression limit (%d)\n" +
+          "Statement has %d expressions.", statementExpressionLimit, getNumStmtExprs());
+      throw new AnalysisException(errorStr);
+    }
+  }
+
   public boolean hasMutualValueTransfer(SlotId a, SlotId b) {
     return hasValueTransfer(a, b) && hasValueTransfer(b, a);
   }
diff --git a/fe/src/main/java/org/apache/impala/analysis/Expr.java b/fe/src/main/java/org/apache/impala/analysis/Expr.java
index 53a0081..1cfa938 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Expr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Expr.java
@@ -347,6 +347,9 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
   // analysisDone().
   private boolean isAnalyzed_ = false;
 
+  // True if this has already been counted towards the number of statement expressions
+  private boolean isCountedForNumStmtExprs_ = false;
+
   protected Expr() {
     type_ = Type.INVALID;
     selectivity_ = -1.0;
@@ -369,6 +372,7 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
     numDistinctValues_ = other.numDistinctValues_;
     isConstant_ = other.isConstant_;
     fn_ = other.fn_;
+    isCountedForNumStmtExprs_ = other.isCountedForNumStmtExprs_;
     children_ = Expr.cloneList(other.children_);
   }
 
@@ -420,6 +424,7 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
         throw new AnalysisException(String.format("Exceeded the maximum depth of an " +
             "expression tree (%s).", EXPR_DEPTH_LIMIT));
       }
+      incrementNumStmtExprs(analyzer);
     }
     for (Expr child: children_) {
       child.analyze(analyzer);
@@ -453,6 +458,24 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
   }
 
   /**
+   * Helper function to properly count the number of statement expressions.
+   * If this expression has not been counted already and this is not a WITH clause,
+   * increment the number of statement expressions. This function guarantees that an
+   * expression will be counted at most once.
+   */
+  private void incrementNumStmtExprs(Analyzer analyzer) {
+    // WITH clauses use a separate Analyzer with its own GlobalState. Skip counting
+    // this expression towards that GlobalState. If the view defined by the WITH
+    // clause is referenced, it will be counted during that analysis.
+    if (analyzer.hasWithClause()) return;
+    // If the expression is already counted, do not count it again. This is important
+    // for expressions that can be cloned (e.g. when doing Expr::trySubstitute()).
+    if (isCountedForNumStmtExprs_) return;
+    analyzer.incrementNumStmtExprs();
+    isCountedForNumStmtExprs_ = true;
+  }
+
+  /**
    * Compute and return evalcost of this expr given the evalcost of all children has been
    * computed. Should be called bottom-up whenever the structure of subtree is modified.
    */
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeExprsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeExprsTest.java
index 23910f4..5fcb747 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeExprsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeExprsTest.java
@@ -2399,6 +2399,165 @@ public class AnalyzeExprsTest extends AnalyzerTest {
     testFuncExprDepthLimit("cast(", "1", " as int)");
   }
 
+  /** Generates a specific number of expressions by repeating a column reference.
+   * It generates a string containing 'num_copies' expressions. If 'use_alias' is true,
+   * each column reference has a distinct alias. This allows the repeated columns to be
+   * used in places that require distinct names (e.g. the definition of a view).
+   */
+  String getRepeatedColumnReference(String col_name, int num_copies, boolean use_alias) {
+    StringBuilder repColRef = new StringBuilder();
+
+    for (int i = 0; i < num_copies; ++i) {
+      if (i != 0) repColRef.append(", ");
+      repColRef.append(col_name);
+      if (use_alias) repColRef.append(String.format(" alias%d", i));
+    }
+    return repColRef.toString();
+  }
+
+  /** Get the error message for a statement with 'actual_num_expressions' that exceeds
+   * the statment_expression_limit 'limit'.
+   */
+  String getExpressionLimitErrorStr(int actual_num_expressions, int limit) {
+    return String.format("Exceeded the statement expression limit (%d)\n" +
+        "Statement has %d expressions", limit, actual_num_expressions);
+  }
+
+  @Test
+  public void TestStatementExprLimit() {
+    // To keep it simple and fast, we use a low value for statement_expression_limit.
+    // Since the statement_expression_limit is evaluated before rewrites, turn off
+    // expression rewrites.
+    TQueryOptions queryOpts = new TQueryOptions();
+    queryOpts.setStatement_expression_limit(20);
+    queryOpts.setEnable_expr_rewrites(false);
+    AnalysisContext ctx = createAnalysisCtx(queryOpts);
+
+    // Known SQL patterns (repeated reference to column) that generate 20 and 21
+    // expressions, respectively.
+    String repCols20 = getRepeatedColumnReference("int_col", 20, true);
+    String repCols21 = getRepeatedColumnReference("int_col", 21, true);
+
+    // Select from table
+    AnalyzesOk(String.format("select %s from functional.alltypes", repCols20), ctx);
+    assertEquals(ctx.getAnalyzer().getNumStmtExprs(), 20);
+    AnalysisError(String.format("select %s from functional.alltypes", repCols21),
+        ctx, getExpressionLimitErrorStr(21, 20));
+
+    // WHERE clause
+    // This statement has 20 expressions without the WHERE clause, as tested above.
+    // So, this will only be an error if the bool_col in the WHERE clause counts.
+    AnalysisError(String.format("select %s from functional.alltypes where bool_col",
+        repCols20), ctx, getExpressionLimitErrorStr(21, 20));
+
+    // Create table as select
+    AnalyzesOk(String.format("create table exprlimit1 as " +
+        "select %s from functional.alltypes", repCols20), ctx);
+    assertEquals(ctx.getAnalyzer().getNumStmtExprs(), 20);
+    AnalysisError(String.format("create table exprlimit1 as " +
+        "select %s from functional.alltypes", repCols21), ctx,
+        getExpressionLimitErrorStr(21, 20));
+
+    // Create view
+    AnalyzesOk(String.format("create view exprlimit1 as " +
+        "select %s from functional.alltypes", repCols20), ctx);
+    assertEquals(ctx.getAnalyzer().getNumStmtExprs(), 20);
+    AnalysisError(String.format("create view exprlimit1 as " +
+        "select %s from functional.alltypes", repCols21), ctx,
+        getExpressionLimitErrorStr(21, 20));
+
+    // Subquery
+    AnalyzesOk(String.format("select * from (select %s from functional.alltypes) x",
+        repCols20), ctx);
+    assertEquals(ctx.getAnalyzer().getNumStmtExprs(), 20);
+    AnalysisError(String.format("select * from (select %s from functional.alltypes) x",
+        repCols21), ctx, getExpressionLimitErrorStr(21, 20));
+
+    // WITH clause
+    AnalyzesOk(String.format("with v as (select %s from functional.alltypes) " +
+        "select * from v", repCols20), ctx);
+    assertEquals(ctx.getAnalyzer().getNumStmtExprs(), 20);
+    AnalysisError(String.format("with v as (select %s from functional.alltypes) " +
+        "select * from v", repCols21), ctx, getExpressionLimitErrorStr(21, 20));
+
+    // View is counted (functional.alltypes_parens has a few expressions)
+    AnalysisError(String.format("select %s from functional.alltypes_parens", repCols20),
+        ctx, getExpressionLimitErrorStr(32, 20));
+
+    // If a view is referenced multiple times, it counts multiple times.
+    AnalysisError(String.format("with v as (select %s from functional.alltypes) " +
+        "select * from v v1,v v2", repCols20), ctx, getExpressionLimitErrorStr(40, 20));
+
+    // If a view is not referenced, it doesn't count.
+    AnalyzesOk(String.format("with v as (select %s from functional.alltypes) select 1",
+        repCols21), ctx);
+    // This is zero because literals do not count.
+    assertEquals(ctx.getAnalyzer().getNumStmtExprs(), 0);
+
+    // EXPLAIN is not exempted from the limit
+    AnalysisError(String.format("explain select %s from functional.alltypes", repCols21),
+        ctx, getExpressionLimitErrorStr(21, 20));
+
+    // Wide table doesn't impact *
+    AnalyzesOk("select * from functional.widetable_1000_cols", ctx);
+
+    // Literal expressions don't count towards the limit, so build a list of literals
+    // to use to test various cases.
+    StringBuilder literalList = new StringBuilder();
+    for (int i = 0; i < 200; ++i) {
+      if (i != 0) literalList.append(", ");
+      literalList.append(i);
+    }
+
+    // Literals in the select list do not count
+    AnalyzesOk(String.format("select %s", literalList.toString()), ctx);
+    assertEquals(ctx.getAnalyzer().getNumStmtExprs(), 0);
+
+    // Literals in an IN list do not count
+    AnalyzesOk(String.format("select int_col IN (%s) from functional.alltypes",
+        literalList.toString()), ctx);
+
+    // Literals in a VALUES clause do not count
+    StringBuilder valuesList = new StringBuilder();
+    for (int i = 0; i < 200; ++i) {
+      if (i != 0) valuesList.append(", ");
+      valuesList.append("(" + i + ")");
+    }
+    AnalyzesOk("insert into functional.insert_overwrite_nopart (col1) VALUES " +
+        valuesList.toString(), ctx);
+    assertEquals(ctx.getAnalyzer().getNumStmtExprs(), 0);
+
+    // Expressions that are operators applied to constants still count
+    StringBuilder constantExpr = new StringBuilder();
+    for (int i = 0; i < 21; ++i) {
+      if (i != 0) constantExpr.append(" * ");
+      constantExpr.append(i);
+    }
+    // 21 literals with 20 multiplications requires 20 ArithmeticExprs
+    AnalyzesOk(String.format("select %s", constantExpr.toString()), ctx);
+    assertEquals(ctx.getAnalyzer().getNumStmtExprs(), 20);
+    constantExpr.append(" * 100");
+    // 22 literals with 21 multiplications requires 21 ArithmeticExprs
+    AnalysisError(String.format("select %s", constantExpr.toString()),
+        ctx, getExpressionLimitErrorStr(21, 20));
+
+    // Put a non-literal in an IN list to verify it still counts appropriately.
+    StringBuilder inListExpr = new StringBuilder();
+    for (int i = 0; i < 19; i++) {
+      if (i != 0) inListExpr.append(" * ");
+      inListExpr.append(i);
+    }
+    // 19 literals with 18 multiplications = 18 expressions. int_col and IN are another
+    // two expressions. This has 20 expressions total.
+    AnalyzesOk(String.format("select int_col IN (%s) from functional.alltypes",
+        inListExpr.toString()), ctx);
+    assertEquals(ctx.getAnalyzer().getNumStmtExprs(), 20);
+    // Adding one more expression pushes us over the limit.
+    inListExpr.append(" * 100");
+    AnalysisError(String.format("select int_col IN (%s) from functional.alltypes",
+        inListExpr.toString()), ctx, getExpressionLimitErrorStr(21, 20));
+  }
+
   // Verifies the resulting expr decimal type is expectedType under decimal v1 or
   // decimal v2.
   private void testDecimalExpr(String expr, Type decimalExpectedType, boolean isV2) {
diff --git a/tests/common/impala_connection.py b/tests/common/impala_connection.py
index 8b275fb..f1a33bb 100644
--- a/tests/common/impala_connection.py
+++ b/tests/common/impala_connection.py
@@ -41,6 +41,26 @@ LOG.propagate = False
 PROGRESS_LOG_RE = re.compile(
     r'^Query [a-z0-9:]+ [0-9]+% Complete \([0-9]+ out of [0-9]+\)$')
 
+MAX_SQL_LOGGING_LENGTH = 128 * 1024
+
+
+# test_exprs.py's TestExprLimits executes extremely large SQLs (multiple MBs). It is the
+# only test that runs SQL larger than 128KB. Logging these SQLs in execute() increases
+# the size of the JUnitXML files, causing problems for users of JUnitXML like Jenkins.
+# This function limits the size of the SQL logged if it is larger than 128KB.
+def log_sql_stmt(sql_stmt):
+  """If the 'sql_stmt' is shorter than MAX_SQL_LOGGING_LENGTH, log it unchanged. If
+     it is larger than MAX_SQL_LOGGING_LENGTH, truncate it and comment it out."""
+  if (len(sql_stmt) <= MAX_SQL_LOGGING_LENGTH):
+    LOG.info("{0};\n".format(sql_stmt))
+  else:
+    # The logging output should be valid SQL, so the truncated SQL is commented out.
+    LOG.info("-- Skip logging full SQL statement of length {0}".format(len(sql_stmt)))
+    LOG.info("-- Logging a truncated version, commented out:")
+    for line in sql_stmt[0:MAX_SQL_LOGGING_LENGTH].split("\n"):
+      LOG.info("-- {0}".format(line))
+    LOG.info("-- [...]")
+
 # Common wrapper around the internal types of HS2/Beeswax operation/query handles.
 class OperationHandle(object):
   def __init__(self, handle, sql_stmt):
@@ -180,11 +200,13 @@ class BeeswaxConnection(ImpalaConnection):
     self.__beeswax_client.close_dml(operation_handle.get_handle())
 
   def execute(self, sql_stmt, user=None):
-    LOG.info("-- executing against %s\n%s;\n" % (self.__host_port, sql_stmt))
+    LOG.info("-- executing against %s\n" % (self.__host_port))
+    log_sql_stmt(sql_stmt)
     return self.__beeswax_client.execute(sql_stmt, user=user)
 
   def execute_async(self, sql_stmt, user=None):
-    LOG.info("-- executing async: %s\n%s;\n" % (self.__host_port, sql_stmt))
+    LOG.info("-- executing async: %s\n" % (self.__host_port))
+    log_sql_stmt(sql_stmt)
     beeswax_handle = self.__beeswax_client.execute_query_async(sql_stmt, user=user)
     return OperationHandle(beeswax_handle, sql_stmt)
 
@@ -312,8 +334,9 @@ class ImpylaHS2Connection(ImpalaConnection):
         return r
 
   def execute_async(self, sql_stmt, user=None):
-    LOG.info("-- executing against {0} at {1}\n{2};\n".format(
-        self._is_hive and 'Hive' or 'Impala', self.__host_port, sql_stmt))
+    LOG.info("-- executing against {0} at {1}\n".format(
+        self._is_hive and 'Hive' or 'Impala', self.__host_port))
+    log_sql_stmt(sql_stmt)
     if user is not None:
       raise NotImplementedError("Not yet implemented for HS2 - authentication")
     try:
diff --git a/tests/query_test/test_exprs.py b/tests/query_test/test_exprs.py
index eb50ee7..6722ca8 100644
--- a/tests/query_test/test_exprs.py
+++ b/tests/query_test/test_exprs.py
@@ -16,6 +16,8 @@
 # under the License.
 
 import pytest
+import re
+from random import randint
 
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.test_dimensions import create_exec_option_dimension
@@ -131,6 +133,67 @@ class TestExprLimits(ImpalaTestSuite):
     cast_query = "select " + self.__gen_deep_func_expr("cast(", "1", " as int)")
     self.__exec_query(cast_query)
 
+  def test_under_statement_expression_limit(self):
+    """Generate a huge case statement that barely fits within the statement expression
+       limit and verify that it runs."""
+    # This takes 20+ minutes, so only run it on exhaustive.
+    # TODO: Determine whether this needs to run serially. It use >5 GB of memory.
+    if self.exploration_strategy() != 'exhaustive':
+      pytest.skip("Only test limit of codegen on exhaustive")
+    case = self.__gen_huge_case("int_col", 32, 2, "  ")
+    query = "select {0} as huge_case from functional_parquet.alltypes".format(case)
+    self.__exec_query(query)
+
+  def test_max_statement_size(self):
+    """Generate a huge case statement that exceeds the default 16MB limit and verify
+       that it gets rejected."""
+
+    expected_err_tmpl = ("Statement length of {0} bytes exceeds the maximum "
+        "statement length \({1} bytes\)")
+    size_16mb = 16 * 1024 * 1024
+
+    # Case 1: a valid SQL that would parse correctly
+    case = self.__gen_huge_case("int_col", 75, 2, "  ")
+    query = "select {0} as huge_case from functional.alltypes".format(case)
+    err = self.execute_query_expect_failure(self.client, query)
+    assert re.search(expected_err_tmpl.format(len(query), size_16mb), str(err))
+
+    # Case 2: a string of 'a' characters that does not parse. This will still fail
+    # with the same message, because the check is before parsing.
+    invalid_sql = 'a' * (size_16mb + 1)
+    err = self.execute_query_expect_failure(self.client, invalid_sql)
+    assert re.search(expected_err_tmpl.format(len(invalid_sql), size_16mb), str(err))
+
+  def test_statement_expression_limit(self):
+    """Generate a huge case statement that barely fits within the 16MB limit but exceeds
+       the statement expression limit. Verify that it fails."""
+    case = self.__gen_huge_case("int_col", 66, 2, "  ")
+    query = "select {0} as huge_case from functional.alltypes".format(case)
+    assert len(query) < 16 * 1024 * 1024
+    expected_err_re = ("Exceeded the statement expression limit \({0}\)\n"
+        "Statement has .* expressions.").format(250000)
+    err = self.execute_query_expect_failure(self.client, query)
+    assert re.search(expected_err_re, str(err))
+
+  def __gen_huge_case(self, col_name, fanout, depth, indent):
+    toks = ["case\n"]
+    for i in xrange(fanout):
+      add = randint(1, 1000000)
+      divisor = randint(1, 10000000)
+      mod = randint(0, divisor)
+      # Generate a mathematical expr that can't be easily optimised out.
+      when_expr = "{0} + {1} % {2} = {3}".format(col_name, add, divisor, mod)
+      if depth == 0:
+        then_expr = "{0}".format(i)
+      else:
+        then_expr = "({0})".format(
+            self.__gen_huge_case(col_name, fanout, depth - 1, indent + "  "))
+      toks.append(indent)
+      toks.append("when {0} then {1}\n".format(when_expr, then_expr))
+    toks.append(indent)
+    toks.append("end")
+    return ''.join(toks)
+
   def __gen_deep_infix_expr(self, prefix, repeat_suffix):
     expr = prefix
     for i in xrange(self.EXPR_DEPTH_LIMIT - 1):
@@ -150,8 +213,8 @@ class TestExprLimits(ImpalaTestSuite):
     try:
       impala_ret = self.execute_query(sql_str)
       assert impala_ret.success, "Failed to execute query %s" % (sql_str)
-    except: # consider any exception a failure
-      assert False, "Failed to execute query %s" % (sql_str)
+    except Exception as e:  # consider any exception a failure
+      assert False, "Failed to execute query %s: %s" % (sql_str, e)
 
 class TestUtcTimestampFunctions(ImpalaTestSuite):
   """Tests for UTC timestamp functions, i.e. functions that do not depend on the behavior