You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/08/13 03:12:00 UTC

[impala] branch master updated (760169e -> 2df3b8c)

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from 760169e  IMPALA-8791: Handle the case where there is no fragment scheduled on the coordinator
     new 6cde6d3  [DOCS] Put impala_date doc in the right place in the alphabetical order
     new 8094811  IMPALA-8766: Undo hadoop-cloud-storage + HWX Nexus
     new 2df3b8c  Revert "IMPALA-8791: Handle the case where there is no fragment scheduled on"

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/runtime/coordinator-backend-state.cc        | 15 +----
 be/src/runtime/coordinator-backend-state.h         |  8 +--
 be/src/runtime/coordinator.cc                      | 12 ++--
 be/src/runtime/coordinator.h                       |  4 +-
 be/src/scheduling/admission-controller-test.cc     | 33 ++++-------
 be/src/scheduling/admission-controller.cc          | 61 +++++++++++---------
 be/src/scheduling/query-schedule.cc                | 57 ++++++++----------
 be/src/scheduling/query-schedule.h                 | 43 +++++++-------
 be/src/scheduling/scheduler.cc                     | 15 +++--
 be/src/scheduling/scheduler.h                      |  5 +-
 bin/impala-config.sh                               |  1 +
 docs/impala.ditamap                                |  2 +-
 fe/pom.xml                                         | 67 ++++++++++------------
 .../java/org/apache/impala/planner/Planner.java    | 16 ++----
 impala-parent/pom.xml                              | 29 ++++++----
 .../QueryTest/dedicated-coord-mem-estimates.test   |  8 +--
 16 files changed, 172 insertions(+), 204 deletions(-)


[impala] 03/03: Revert "IMPALA-8791: Handle the case where there is no fragment scheduled on"

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 2df3b8cf82af66199f5851c84f3aa065577f6d7d
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Mon Aug 12 17:57:32 2019 -0700

    Revert "IMPALA-8791: Handle the case where there is no fragment scheduled on"
    
    This reverts commit 760169edcbca438c5964380a604b6c271c6bd1a3.
    
    Change-Id: Id20cf3581995f450de6f491e7874cbcf23b52cda
    Reviewed-on: http://gerrit.cloudera.org:8080/14052
    Reviewed-by: Tim Armstrong <ta...@cloudera.com>
    Tested-by: Tim Armstrong <ta...@cloudera.com>
---
 be/src/runtime/coordinator-backend-state.cc        | 15 +-----
 be/src/runtime/coordinator-backend-state.h         |  8 +--
 be/src/runtime/coordinator.cc                      | 12 +++--
 be/src/runtime/coordinator.h                       |  4 +-
 be/src/scheduling/admission-controller-test.cc     | 33 ++++--------
 be/src/scheduling/admission-controller.cc          | 61 ++++++++++++----------
 be/src/scheduling/query-schedule.cc                | 57 +++++++++-----------
 be/src/scheduling/query-schedule.h                 | 43 ++++++++-------
 be/src/scheduling/scheduler.cc                     | 15 +++---
 be/src/scheduling/scheduler.h                      |  5 +-
 .../java/org/apache/impala/planner/Planner.java    | 16 +++---
 .../QueryTest/dedicated-coord-mem-estimates.test   |  8 +--
 12 files changed, 123 insertions(+), 154 deletions(-)

diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 60e67c1..028dd70 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -69,8 +69,8 @@ void Coordinator::BackendState::Init(
     const BackendExecParams& exec_params, const vector<FragmentStats*>& fragment_stats,
     RuntimeProfile* host_profile_parent, ObjectPool* obj_pool) {
   backend_exec_params_ = &exec_params;
-  host_ = backend_exec_params_->be_desc.address;
-  krpc_host_ = backend_exec_params_->be_desc.krpc_address;
+  host_ = backend_exec_params_->instance_params[0]->host;
+  krpc_host_ = backend_exec_params_->instance_params[0]->krpc_host;
   num_remaining_instances_ = backend_exec_params_->instance_params.size();
 
   host_profile_ = RuntimeProfile::Create(obj_pool, TNetworkAddressToString(host_));
@@ -187,14 +187,6 @@ void Coordinator::BackendState::Exec(
     last_report_time_ms_ = GenerateReportTimestamp();
     exec_complete_barrier->Notify();
   });
-
-  // Don not issue an ExecQueryFInstances RPC if there are no fragment instances
-  // scheduled to run on this backend.
-  if (IsEmptyBackend()) {
-    DCHECK(backend_exec_params_->is_coord_backend);
-    return;
-  }
-
   std::unique_ptr<ControlServiceProxy> proxy;
   Status get_proxy_status = ControlService::GetProxy(krpc_host_, host_.hostname, &proxy);
   if (!get_proxy_status.ok()) {
@@ -357,7 +349,6 @@ bool Coordinator::BackendState::ApplyExecStatusReport(
     const ReportExecStatusRequestPB& backend_exec_status,
     const TRuntimeProfileForest& thrift_profiles, ExecSummary* exec_summary,
     ProgressUpdater* scan_range_progress, DmlExecState* dml_exec_state) {
-  DCHECK(!IsEmptyBackend());
   // Hold the exec_summary's lock to avoid exposing it half-way through
   // the update loop below.
   lock_guard<SpinLock> l1(exec_summary->lock);
@@ -462,7 +453,6 @@ bool Coordinator::BackendState::ApplyExecStatusReport(
 
 void Coordinator::BackendState::UpdateHostProfile(
     const TRuntimeProfileTree& thrift_profile) {
-  DCHECK(!IsEmptyBackend());
   host_profile_->Update(thrift_profile);
 }
 
@@ -544,7 +534,6 @@ bool Coordinator::BackendState::Cancel() {
 }
 
 void Coordinator::BackendState::PublishFilter(const TPublishFilterParams& rpc_params) {
-  DCHECK(!IsEmptyBackend());
   DCHECK(rpc_params.dst_query_id == query_id());
   // If the backend is already done, it's not waiting for this filter, so we skip
   // sending it in this case.
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index 4b0cfad..4c41c8c 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -52,9 +52,7 @@ class ExecSummary;
 struct FInstanceExecParams;
 
 /// This class manages all aspects of the execution of all fragment instances of a
-/// single query on a particular backend. For the coordinator backend its possible to have
-/// no fragment instances scheduled on it. In that case, no RPCs are issued and the
-/// Backend state transitions to 'done' state right after Exec() is called on it.
+/// single query on a particular backend.
 /// Thread-safe unless pointed out otherwise.
 class Coordinator::BackendState {
  public:
@@ -74,7 +72,6 @@ class Coordinator::BackendState {
   /// notifies on rpc_complete_barrier when the rpc completes. Success/failure is
   /// communicated through GetStatus(). Uses filter_routing_table to remove filters
   /// that weren't selected during its construction.
-  /// No RPC is issued if there are no fragment instances scheduled on this backend.
   /// The debug_options are applied to the appropriate TPlanFragmentInstanceCtxs, based
   /// on their node_id/instance_idx.
   void Exec(const DebugOptions& debug_options,
@@ -162,9 +159,6 @@ class Coordinator::BackendState {
   /// Returns a timestamp using monotonic time for tracking arrival of status reports.
   static int64_t GenerateReportTimestamp() { return MonotonicMillis(); }
 
-  /// Returns True if there are no fragment instances scheduled on this backend.
-  bool IsEmptyBackend() { return backend_exec_params_->instance_params.empty(); }
-
  private:
   /// Execution stats for a single fragment instance.
   /// Not thread-safe.
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 4df15d9..bd4fb75 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -111,8 +111,14 @@ Status Coordinator::Exec() {
   const string& str = Substitute("Query $0", PrintId(query_id()));
   progress_.Init(str, schedule_.num_scan_ranges());
 
+  // If there is no coord fragment then pick the mem limit computed for an executor.
+  // TODO: IMPALA-8791: make sure minimal or no limit is imposed for cases where no
+  // fragments are scheduled to run on the coordinator backend.
+  int64_t coord_mem_limit = schedule_.requiresCoordinatorFragment() ?
+      schedule_.coord_backend_mem_limit() :
+      schedule_.per_backend_mem_limit();
   query_state_ = ExecEnv::GetInstance()->query_exec_mgr()->CreateQueryState(
-      query_ctx(), schedule_.coord_backend_mem_limit());
+      query_ctx(), coord_mem_limit);
   filter_mem_tracker_ = query_state_->obj_pool()->Add(new MemTracker(
       -1, "Runtime Filter (Coordinator)", query_state_->query_mem_tracker(), false));
 
@@ -405,10 +411,6 @@ Status Coordinator::FinishBackendStartup() {
       max_latency_host = TNetworkAddressToString(backend_state->impalad_address());
     }
     latencies.Update(backend_state->rpc_latency());
-    // Mark backend complete if no fragment instances were assigned to it.
-    if (backend_state->IsEmptyBackend()) {
-      backend_exec_complete_barrier_->Notify();
-    }
   }
   query_profile_->AddInfoString(
       "Backend startup latencies", latencies.ToHumanReadable());
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 72ab3af..9098dd0 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -482,9 +482,7 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
 
   /// Helper for Exec(). Checks for errors encountered when starting backend execution,
   /// using any non-OK status, if any, as the overall status. Returns the overall
-  /// status. Also updates query_profile_ with the startup latency histogram and the
-  /// backend_exec_complete_barrier_ if there is any backend which is already done (only
-  /// possible at this point if no fragment instances were assigned to it).
+  /// status. Also updates query_profile_ with the startup latency histogram.
   Status FinishBackendStartup() WARN_UNUSED_RESULT;
 
   /// Build the filter routing table by iterating over all plan nodes and collecting the
diff --git a/be/src/scheduling/admission-controller-test.cc b/be/src/scheduling/admission-controller-test.cc
index 17c8457..a241525 100644
--- a/be/src/scheduling/admission-controller-test.cc
+++ b/be/src/scheduling/admission-controller-test.cc
@@ -97,10 +97,11 @@ class AdmissionControllerTest : public testing::Test {
     TQueryOptions* query_options = pool_.Add(new TQueryOptions());
     query_options->__set_mem_limit(mem_limit);
     QuerySchedule* query_schedule = pool_.Add(new QuerySchedule(
-        *query_id, *request, *query_options, profile));
+        *query_id, *request, *query_options, is_dedicated_coord, profile));
     query_schedule->set_executor_group(ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME);
-    SetHostsInQuerySchedule(*query_schedule, num_hosts, is_dedicated_coord);
     query_schedule->UpdateMemoryRequirements(config);
+
+    SetHostsInQuerySchedule(*query_schedule, num_hosts);
     return query_schedule;
   }
 
@@ -112,24 +113,14 @@ class AdmissionControllerTest : public testing::Test {
   }
 
   /// Replace the per-backend hosts in the schedule with one having 'count' hosts.
-  /// Note: no FInstanceExecParams are added so
-  /// QuerySchedule::UseDedicatedCoordEstimates() would consider this schedule as not
-  /// having anything scheduled on the backend which would result in always returning true
-  /// if a dedicated coordinator backend exists.
   void SetHostsInQuerySchedule(QuerySchedule& query_schedule, const int count,
-      bool is_dedicated_coord, int64_t min_mem_reservation_bytes = 0,
-      int64_t admit_mem_limit = 200L * MEGABYTE) {
+      int64_t min_mem_reservation_bytes = 0, int64_t admit_mem_limit = 200L * MEGABYTE) {
     PerBackendExecParams* per_backend_exec_params = pool_.Add(new PerBackendExecParams());
     for (int i = 0; i < count; ++i) {
       BackendExecParams* backend_exec_params = pool_.Add(new BackendExecParams());
+      backend_exec_params->admit_mem_limit = admit_mem_limit;
       backend_exec_params->min_mem_reservation_bytes = min_mem_reservation_bytes;
-      backend_exec_params->be_desc.__set_admit_mem_limit(admit_mem_limit);
-      backend_exec_params->be_desc.__set_is_executor(true);
-      if (i == 0) {
-        // Add first element as the coordinator.
-        backend_exec_params->is_coord_backend = true;
-        backend_exec_params->be_desc.__set_is_executor(!is_dedicated_coord);
-      }
+      if (i == 0) backend_exec_params->contains_coord_fragment = true;
       const string host_name = Substitute("host$0", i);
       per_backend_exec_params->emplace(
           MakeNetworkAddress(host_name, 25000), *backend_exec_params);
@@ -488,7 +479,7 @@ TEST_F(AdmissionControllerTest, QueryRejection) {
 
   // Adjust the QuerySchedule to have minimum memory reservation of 45MB.
   // This will be rejected immediately as minimum memory reservation is too high.
-  SetHostsInQuerySchedule(*query_schedule, host_count, false, 45L * MEGABYTE);
+  SetHostsInQuerySchedule(*query_schedule, host_count, 45L * MEGABYTE);
   string rejected_reserved_reason;
   ASSERT_TRUE(admission_controller->RejectForSchedule(
       *query_schedule, config_d, host_count, host_count, &rejected_reserved_reason));
@@ -808,19 +799,17 @@ TEST_F(AdmissionControllerTest, DedicatedCoordAdmissionChecks) {
   PerBackendExecParams* per_backend_exec_params = pool_.Add(new PerBackendExecParams());
   // Add coordinator backend.
   BackendExecParams* coord_exec_params = pool_.Add(new BackendExecParams());
-  coord_exec_params->is_coord_backend = true;
+  coord_exec_params->admit_mem_limit = 512 * MEGABYTE;
+  coord_exec_params->contains_coord_fragment = true;
   coord_exec_params->thread_reservation = 1;
-  coord_exec_params->be_desc.__set_admit_mem_limit(512 * MEGABYTE);
-  coord_exec_params->be_desc.__set_is_executor(false);
   const string coord_host_name = Substitute("host$0", 1);
   TNetworkAddress coord_addr = MakeNetworkAddress(coord_host_name, 25000);
   const string coord_host = TNetworkAddressToString(coord_addr);
   per_backend_exec_params->emplace(coord_addr, *coord_exec_params);
   // Add executor backend.
   BackendExecParams* backend_exec_params = pool_.Add(new BackendExecParams());
+  backend_exec_params->admit_mem_limit = GIGABYTE;
   backend_exec_params->thread_reservation = 1;
-  backend_exec_params->be_desc.__set_admit_mem_limit(GIGABYTE);
-  backend_exec_params->be_desc.__set_is_executor(true);
   const string exec_host_name = Substitute("host$0", 2);
   TNetworkAddress exec_addr = MakeNetworkAddress(exec_host_name, 25000);
   const string exec_host = TNetworkAddressToString(exec_addr);
@@ -864,7 +853,7 @@ TEST_F(AdmissionControllerTest, DedicatedCoordAdmissionChecks) {
   // Test 2: coord's admit_mem_limit < executor's admit_mem_limit. Query rejected because
   // coord's admit_mem_limit is less than mem_to_admit on the coord.
   // Re-using previous QuerySchedule object.
-  coord_exec_params->be_desc.__set_admit_mem_limit(100 * MEGABYTE);
+  coord_exec_params->admit_mem_limit = 100 * MEGABYTE;
   (*per_backend_exec_params)[coord_addr] = *coord_exec_params;
   query_schedule->set_per_backend_exec_params(*per_backend_exec_params);
   ASSERT_TRUE(admission_controller->RejectForSchedule(
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index 666659b..d31c50b 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -56,6 +56,8 @@ DEFINE_int64_hidden(admission_control_stale_topic_threshold_ms, 5 * 1000,
     "capture most cases where the Impala daemon is disconnected from the statestore "
     "or topic updates are seriously delayed.");
 
+DECLARE_bool(is_executor);
+
 namespace impala {
 
 const int64_t AdmissionController::PoolStats::HISTOGRAM_NUM_OF_BINS = 128;
@@ -396,7 +398,7 @@ void AdmissionController::UpdateHostStats(
   int64_t per_backend_mem_to_admit = schedule.per_backend_mem_to_admit();
   for (const auto& entry : schedule.per_backend_exec_params()) {
     const TNetworkAddress& host_addr = entry.first;
-    int64_t mem_to_admit = entry.second.is_coord_backend ?
+    int64_t mem_to_admit = entry.second.contains_coord_fragment ?
         schedule.coord_backend_mem_to_admit() : per_backend_mem_to_admit;
     if (!is_admitting) mem_to_admit *= -1;
     const string host = TNetworkAddressToString(host_addr);
@@ -439,8 +441,9 @@ bool AdmissionController::CanAccommodateMaxInitialReservation(
   const int64_t coord_min_reservation = schedule.coord_min_reservation();
   return CanMemLimitAccommodateReservation(
              executor_mem_limit, executor_min_reservation, mem_unavailable_reason)
-      && CanMemLimitAccommodateReservation(
-             coord_mem_limit, coord_min_reservation, mem_unavailable_reason);
+      && (!schedule.requiresCoordinatorFragment()
+             || CanMemLimitAccommodateReservation(
+                    coord_mem_limit, coord_min_reservation, mem_unavailable_reason));
 }
 
 bool AdmissionController::HasAvailableMemResources(const QuerySchedule& schedule,
@@ -480,11 +483,11 @@ bool AdmissionController::HasAvailableMemResources(const QuerySchedule& schedule
   for (const auto& entry : schedule.per_backend_exec_params()) {
     const TNetworkAddress& host = entry.first;
     const string host_id = TNetworkAddressToString(host);
-    int64_t admit_mem_limit = entry.second.be_desc.admit_mem_limit;
+    int64_t admit_mem_limit = entry.second.admit_mem_limit;
     const HostStats& host_stats = host_stats_[host_id];
     int64_t mem_reserved = host_stats.mem_reserved;
     int64_t mem_admitted = host_stats.mem_admitted;
-    int64_t mem_to_admit = entry.second.is_coord_backend ?
+    int64_t mem_to_admit = entry.second.contains_coord_fragment ?
         coord_mem_to_admit : executor_mem_to_admit;
     VLOG_ROW << "Checking memory on host=" << host_id
              << " mem_reserved=" << PrintBytes(mem_reserved)
@@ -516,7 +519,7 @@ bool AdmissionController::HasAvailableSlot(const QuerySchedule& schedule,
   for (const auto& entry : schedule.per_backend_exec_params()) {
     const TNetworkAddress& host = entry.first;
     const string host_id = TNetworkAddressToString(host);
-    int64_t admit_num_queries_limit = entry.second.be_desc.admit_num_queries_limit;
+    int64_t admit_num_queries_limit = entry.second.admit_num_queries_limit;
     int64_t num_admitted = host_stats_[host_id].num_admitted;
     VLOG_ROW << "Checking available slot on host=" << host_id
              << " num_admitted=" << num_admitted << " needs=" << num_admitted + 1
@@ -637,21 +640,21 @@ bool AdmissionController::RejectForSchedule(const QuerySchedule& schedule,
       nullptr, std::numeric_limits<int64_t>::max());
   int64_t cluster_thread_reservation = 0;
   for (const auto& e : schedule.per_backend_exec_params()) {
-    const BackendExecParams& bp = e.second;
-    cluster_min_mem_reservation_bytes += bp.min_mem_reservation_bytes;
-    if (bp.min_mem_reservation_bytes > largest_min_mem_reservation.second) {
-      largest_min_mem_reservation = make_pair(&e.first, bp.min_mem_reservation_bytes);
+    cluster_min_mem_reservation_bytes += e.second.min_mem_reservation_bytes;
+    if (e.second.min_mem_reservation_bytes > largest_min_mem_reservation.second) {
+      largest_min_mem_reservation =
+          make_pair(&e.first, e.second.min_mem_reservation_bytes);
     }
-    cluster_thread_reservation += bp.thread_reservation;
-    if (bp.thread_reservation > max_thread_reservation.second) {
-      max_thread_reservation = make_pair(&e.first, bp.thread_reservation);
+    cluster_thread_reservation += e.second.thread_reservation;
+    if (e.second.thread_reservation > max_thread_reservation.second) {
+      max_thread_reservation = make_pair(&e.first, e.second.thread_reservation);
     }
-    if (bp.is_coord_backend) {
+    if (e.second.contains_coord_fragment) {
       coord_admit_mem_limit.first = &e.first;
-      coord_admit_mem_limit.second = bp.be_desc.admit_mem_limit;
-    } else if (bp.be_desc.admit_mem_limit < min_executor_admit_mem_limit.second) {
+      coord_admit_mem_limit.second = e.second.admit_mem_limit;
+    } else if (e.second.admit_mem_limit < min_executor_admit_mem_limit.second) {
       min_executor_admit_mem_limit.first = &e.first;
-      min_executor_admit_mem_limit.second = bp.be_desc.admit_mem_limit;
+      min_executor_admit_mem_limit.second = e.second.admit_mem_limit;
     }
   }
 
@@ -721,15 +724,17 @@ bool AdmissionController::RejectForSchedule(const QuerySchedule& schedule,
               TNetworkAddressToString(*min_executor_admit_mem_limit.first));
       return true;
     }
-    int64_t coord_mem_to_admit = schedule.coord_backend_mem_to_admit();
-    VLOG_ROW << "Checking coordinator mem with coord_mem_to_admit = "
-             << coord_mem_to_admit
-             << " and coord_admit_mem_limit.second = " << coord_admit_mem_limit.second;
-    if (coord_mem_to_admit > coord_admit_mem_limit.second) {
-      *rejection_reason = Substitute(REASON_REQ_OVER_NODE_MEM,
-          PrintBytes(coord_mem_to_admit), PrintBytes(coord_admit_mem_limit.second),
-          TNetworkAddressToString(*coord_admit_mem_limit.first));
-      return true;
+    if (schedule.requiresCoordinatorFragment()) {
+      int64_t coord_mem_to_admit = schedule.coord_backend_mem_to_admit();
+      VLOG_ROW << "Checking coordinator mem with coord_mem_to_admit = "
+               << coord_mem_to_admit
+               << " and coord_admit_mem_limit.second = " << coord_admit_mem_limit.second;
+      if (coord_mem_to_admit > coord_admit_mem_limit.second) {
+        *rejection_reason = Substitute(REASON_REQ_OVER_NODE_MEM,
+            PrintBytes(coord_mem_to_admit), PrintBytes(coord_admit_mem_limit.second),
+            TNetworkAddressToString(*coord_admit_mem_limit.first));
+        return true;
+      }
     }
   }
   return false;
@@ -1143,9 +1148,11 @@ Status AdmissionController::ComputeGroupSchedules(
   for (const ExecutorGroup* executor_group : executor_groups) {
     DCHECK(executor_group->IsHealthy());
     DCHECK_GT(executor_group->NumExecutors(), 0);
+    bool is_dedicated_coord = !FLAGS_is_executor;
     unique_ptr<QuerySchedule> group_schedule =
         make_unique<QuerySchedule>(request.query_id, request.request,
-            request.query_options, request.summary_profile, request.query_events);
+            request.query_options, is_dedicated_coord, request.summary_profile,
+            request.query_events);
     const string& group_name = executor_group->name();
     VLOG(3) << "Scheduling for executor group: " << group_name << " with "
             << executor_group->NumExecutors() << " executors";
diff --git a/be/src/scheduling/query-schedule.cc b/be/src/scheduling/query-schedule.cc
index 86fe8fd..fceee52 100644
--- a/be/src/scheduling/query-schedule.cc
+++ b/be/src/scheduling/query-schedule.cc
@@ -43,26 +43,29 @@ DEFINE_bool_hidden(use_dedicated_coordinator_estimates, true,
 namespace impala {
 
 QuerySchedule::QuerySchedule(const TUniqueId& query_id, const TQueryExecRequest& request,
-    const TQueryOptions& query_options, RuntimeProfile* summary_profile,
-    RuntimeProfile::EventSequence* query_events)
+    const TQueryOptions& query_options, bool is_dedicated_coord,
+    RuntimeProfile* summary_profile, RuntimeProfile::EventSequence* query_events)
   : query_id_(query_id),
     request_(request),
     query_options_(query_options),
     summary_profile_(summary_profile),
     query_events_(query_events),
     num_scan_ranges_(0),
-    next_instance_id_(query_id) {
+    next_instance_id_(query_id),
+    is_dedicated_coord_(is_dedicated_coord) {
   Init();
 }
 
 QuerySchedule::QuerySchedule(const TUniqueId& query_id, const TQueryExecRequest& request,
-    const TQueryOptions& query_options, RuntimeProfile* summary_profile)
+    const TQueryOptions& query_options, bool is_dedicated_coord,
+    RuntimeProfile* summary_profile)
   : query_id_(query_id),
     request_(request),
     query_options_(query_options),
     summary_profile_(summary_profile),
     num_scan_ranges_(0),
-    next_instance_id_(query_id) {
+    next_instance_id_(query_id),
+    is_dedicated_coord_(is_dedicated_coord) {
   // Init() is not called, this constructor is for white box testing only.
   DCHECK(TestInfo::is_test());
 }
@@ -86,7 +89,7 @@ void QuerySchedule::Init() {
 
   // mark coordinator fragment
   const TPlanFragment& root_fragment = request_.plan_exec_info[0].fragments[0];
-  if (RequiresCoordinatorFragment()) {
+  if (requiresCoordinatorFragment()) {
     fragment_exec_params_[root_fragment.idx].is_coord_fragment = true;
     // the coordinator instance gets index 0, generated instance ids start at 1
     next_instance_id_ = CreateInstanceId(next_instance_id_, 1);
@@ -179,11 +182,7 @@ void QuerySchedule::Validate() const {
       }
     }
   }
-
-  for (const auto& elem: per_backend_exec_params_) {
-    const BackendExecParams& bp = elem.second;
-    DCHECK(!bp.instance_params.empty() || bp.is_coord_backend);
-  }
+  // TODO: add validation for BackendExecParams
 }
 
 int64_t QuerySchedule::GetPerExecutorMemoryEstimate() const {
@@ -224,7 +223,7 @@ void QuerySchedule::GetTPlanFragments(vector<const TPlanFragment*>* fragments) c
 }
 
 const FInstanceExecParams& QuerySchedule::GetCoordInstanceExecParams() const {
-  DCHECK(RequiresCoordinatorFragment());
+  DCHECK(requiresCoordinatorFragment());
   const TPlanFragment& coord_fragment =  request_.plan_exec_info[0].fragments[0];
   const FragmentExecParams& fragment_params = fragment_exec_params_[coord_fragment.idx];
   DCHECK_EQ(fragment_params.instance_exec_params.size(), 1);
@@ -248,25 +247,20 @@ int QuerySchedule::GetNumFragmentInstances() const {
 }
 
 int64_t QuerySchedule::GetClusterMemoryToAdmit() const {
-  // There will always be an entry for the coordinator in per_backend_exec_params_.
-  return per_backend_mem_to_admit() * (per_backend_exec_params_.size() - 1)
-      + coord_backend_mem_to_admit();
+  if (!requiresCoordinatorFragment()) {
+    // For this case, there will be no coordinator fragment so only use the per
+    // executor mem to admit while accounting for admitted memory. This will also ensure
+    // the per backend mem admitted accounting is consistent with the cluster-wide mem
+    // admitted.
+    return per_backend_mem_to_admit() * per_backend_exec_params_.size();
+  } else {
+    return per_backend_mem_to_admit() * (per_backend_exec_params_.size() - 1)
+        + coord_backend_mem_to_admit();
+  }
 }
 
 bool QuerySchedule::UseDedicatedCoordEstimates() const {
-  for (auto& itr : per_backend_exec_params_) {
-    if (!itr.second.is_coord_backend) continue;
-    auto& coord = itr.second;
-    bool is_dedicated_coord = !coord.be_desc.is_executor;
-    bool only_coord_fragment_scheduled =
-        RequiresCoordinatorFragment() && coord.instance_params.size() == 1;
-    bool no_fragment_scheduled = coord.instance_params.size() == 0;
-    return FLAGS_use_dedicated_coordinator_estimates && is_dedicated_coord
-        && (only_coord_fragment_scheduled || no_fragment_scheduled);
-  }
-  DCHECK(false)
-      << "Coordinator backend should always have a entry in per_backend_exec_params_";
-  return false;
+  return is_dedicated_coord_ && FLAGS_use_dedicated_coordinator_estimates;
 }
 
 void QuerySchedule::UpdateMemoryRequirements(const TPoolConfig& pool_cfg) {
@@ -275,7 +269,6 @@ void QuerySchedule::UpdateMemoryRequirements(const TPoolConfig& pool_cfg) {
   // mem_limit if it is set in the query options, else sets it to -1 (no limit).
   bool mimic_old_behaviour =
       pool_cfg.min_query_mem_limit == 0 && pool_cfg.max_query_mem_limit == 0;
-  bool use_dedicated_coord_estimates = UseDedicatedCoordEstimates();
 
   per_backend_mem_to_admit_ = 0;
   coord_backend_mem_to_admit_ = 0;
@@ -288,7 +281,7 @@ void QuerySchedule::UpdateMemoryRequirements(const TPoolConfig& pool_cfg) {
 
   if (!has_query_option) {
     per_backend_mem_to_admit_ = GetPerExecutorMemoryEstimate();
-    coord_backend_mem_to_admit_ = use_dedicated_coord_estimates ?
+    coord_backend_mem_to_admit_ = UseDedicatedCoordEstimates() ?
         GetDedicatedCoordMemoryEstimate() :
         GetPerExecutorMemoryEstimate();
     if (!mimic_old_behaviour) {
@@ -306,7 +299,7 @@ void QuerySchedule::UpdateMemoryRequirements(const TPoolConfig& pool_cfg) {
     if (pool_cfg.min_query_mem_limit > 0) {
       per_backend_mem_to_admit_ =
           max(per_backend_mem_to_admit_, pool_cfg.min_query_mem_limit);
-      if (!use_dedicated_coord_estimates || has_query_option) {
+      if (!UseDedicatedCoordEstimates() || has_query_option) {
         // The minimum mem limit option does not apply to dedicated coordinators -
         // this would result in over-reserving of memory. Treat coordinator and
         // executor mem limits the same if the query option was explicitly set.
@@ -328,7 +321,7 @@ void QuerySchedule::UpdateMemoryRequirements(const TPoolConfig& pool_cfg) {
   coord_backend_mem_to_admit_ = min(coord_backend_mem_to_admit_, MemInfo::physical_mem());
 
   // If the query is only scheduled to run on the coordinator.
-  if (per_backend_exec_params_.size() == 1 && RequiresCoordinatorFragment()) {
+  if (per_backend_exec_params_.size() == 1 && requiresCoordinatorFragment()) {
     per_backend_mem_to_admit_ = 0;
   }
 
diff --git a/be/src/scheduling/query-schedule.h b/be/src/scheduling/query-schedule.h
index 6725774..ad169c7 100644
--- a/be/src/scheduling/query-schedule.h
+++ b/be/src/scheduling/query-schedule.h
@@ -45,9 +45,7 @@ typedef std::map<TPlanNodeId, std::vector<TScanRangeParams>> PerNodeScanRanges;
 typedef std::unordered_map<TNetworkAddress, PerNodeScanRanges>
     FragmentScanRangeAssignment;
 
-/// Execution parameters for a single backend. This gets created for every backend that
-/// participates in query execution, which includes, every backend that has fragments
-/// scheduled on it and the coordinator backend. Computed by Scheduler::Schedule(), set
+/// Execution parameters for a single backend. Computed by Scheduler::Schedule(), set
 /// via QuerySchedule::set_per_backend_exec_params(). Used as an input to
 /// AdmissionController and a BackendState.
 struct BackendExecParams {
@@ -56,8 +54,6 @@ struct BackendExecParams {
   /// The fragment instance params assigned to this backend. All instances of a
   /// particular fragment are contiguous in this vector. Query lifetime;
   /// FInstanceExecParams are owned by QuerySchedule::fragment_exec_params_.
-  /// This can be empty only for the coordinator backend, that is, if 'is_coord_backend'
-  /// is true.
   std::vector<const FInstanceExecParams*> instance_params;
 
   // The minimum query-wide buffer reservation size (in bytes) required for this backend.
@@ -78,8 +74,16 @@ struct BackendExecParams {
   // concurrently-executing fragment instances at any point in query execution.
   int64_t thread_reservation = 0;
 
-  // Indicates whether this backend is the coordinator.
-  bool is_coord_backend = false;
+  // The maximum bytes of memory that can be admitted to this backend by the
+  // admission controller. Obtained from the scheduler's executors configuration
+  // which is updated by membership updates from the statestore.
+  int64_t admit_mem_limit = 0;
+
+  // The maximum number of queries that this backend can execute concurrently.
+  int64_t admit_num_queries_limit = 0;
+
+  // Indicates whether this backend will run the coordinator fragment.
+  bool contains_coord_fragment = false;
 };
 
 /// Map from an impalad host address to the list of assigned fragment instance params.
@@ -158,17 +162,18 @@ struct FragmentExecParams {
 class QuerySchedule {
  public:
   QuerySchedule(const TUniqueId& query_id, const TQueryExecRequest& request,
-      const TQueryOptions& query_options, RuntimeProfile* summary_profile,
+      const TQueryOptions& query_options, bool is_dedicated_coord,
+      RuntimeProfile* summary_profile,
       RuntimeProfile::EventSequence* query_events);
 
   /// For testing only: build a QuerySchedule object but do not run Init().
   QuerySchedule(const TUniqueId& query_id, const TQueryExecRequest& request,
-      const TQueryOptions& query_options, RuntimeProfile* summary_profile);
+      const TQueryOptions& query_options, bool is_dedicated_coord,
+      RuntimeProfile* summary_profile);
 
   /// Verifies that the schedule is well-formed (and DCHECKs if it isn't):
   /// - all fragments have a FragmentExecParams
   /// - all scan ranges are assigned
-  /// - all BackendExecParams have instances assigned except for coordinator.
   void Validate() const;
 
   const TUniqueId& query_id() const { return query_id_; }
@@ -282,11 +287,6 @@ class QuerySchedule {
 
   /// Returns true if coordinator estimates calculated by the planner and specialized for
   /// dedicated a coordinator are to be used for estimating memory requirements.
-  /// This happens when the following conditions are true:
-  /// 1. Coordinator fragment is scheduled on a dedicated coordinator
-  /// 2. Either only the coordinator fragment or no fragments are scheduled on the
-  /// coordinator backend. This essentially means that no executor fragments are scheduled
-  /// on the coordinator backend.
   bool UseDedicatedCoordEstimates() const;
 
   /// Populates or updates the per host query memory limit and the amount of memory to be
@@ -299,6 +299,11 @@ class QuerySchedule {
 
   void set_executor_group(string executor_group);
 
+  /// Returns true if a coordinator fragment is required based on the query stmt type.
+  bool requiresCoordinatorFragment() const {
+    return request_.stmt_type == TStmtType::QUERY;
+  }
+
  private:
   /// These references are valid for the lifetime of this query schedule because they
   /// are all owned by the enclosing QueryExecState.
@@ -361,6 +366,9 @@ class QuerySchedule {
   /// The coordinator's backend memory reservation. Set in Scheduler::Schedule().
   int64_t coord_min_reservation_ = 0;
 
+  /// Indicates whether coordinator fragment will be running on a dedicated coordinator.
+  bool is_dedicated_coord_ = false;
+
   /// The name of the executor group that this schedule was computed for. Set by the
   /// Scheduler and only valid after scheduling completes successfully.
   string executor_group_;
@@ -369,11 +377,6 @@ class QuerySchedule {
   /// Sets is_coord_fragment and input_fragments.
   /// Also populates plan_node_to_fragment_idx_ and plan_node_to_plan_node_idx_.
   void Init();
-
-  /// Returns true if a coordinator fragment is required based on the query stmt type.
-  bool RequiresCoordinatorFragment() const {
-    return request_.stmt_type == TStmtType::QUERY;
-  }
 };
 }
 
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 467694b..b5e6807 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -673,22 +673,21 @@ void Scheduler::ComputeBackendExecParams(
       be_params.initial_mem_reservation_total_claims +=
           f.fragment.initial_mem_reservation_total_claims;
       be_params.thread_reservation += f.fragment.thread_reservation;
+      if (f.is_coord_fragment) be_params.contains_coord_fragment = true;
     }
   }
 
-  // This also ensures an entry always exists for the coordinator backend.
-  int64_t coord_min_reservation = 0;
-  const TNetworkAddress& coord_addr = executor_config.local_be_desc.address;
-  BackendExecParams& coord_be_params = per_backend_params[coord_addr];
-  coord_be_params.is_coord_backend = true;
-  coord_min_reservation = coord_be_params.min_mem_reservation_bytes;
-
   int64_t largest_min_reservation = 0;
+  int64_t coord_min_reservation = 0;
   for (auto& backend : per_backend_params) {
     const TNetworkAddress& host = backend.first;
     const TBackendDescriptor be_desc = LookUpBackendDesc(executor_config, host);
+    backend.second.admit_mem_limit = be_desc.admit_mem_limit;
+    backend.second.admit_num_queries_limit = be_desc.admit_num_queries_limit;
     backend.second.be_desc = be_desc;
-    if (!backend.second.is_coord_backend) {
+    if (backend.second.contains_coord_fragment) {
+      coord_min_reservation = backend.second.min_mem_reservation_bytes;
+    } else {
       largest_min_reservation =
           max(largest_min_reservation, backend.second.min_mem_reservation_bytes);
     }
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index 34ac10d..01ebb3f 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -343,9 +343,8 @@ class Scheduler {
       const TQueryOptions& query_options, RuntimeProfile::Counter* timer,
       FragmentScanRangeAssignment* assignment);
 
-  /// Computes BackendExecParams for all backends assigned in the query and always one for
-  /// the coordinator backend since it participates in execution regardless. Must be
-  /// called after ComputeFragmentExecParams().
+  /// Computes BackendExecParams for all backends assigned in the query. Must be called
+  /// after ComputeFragmentExecParams().
   void ComputeBackendExecParams(
       const ExecutorConfig& executor_config, QuerySchedule* schedule);
 
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index dbcff87..b9d1f30 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -441,16 +441,12 @@ public class Planner {
         maxPerHostPeakResources.getMinMemReservationBytes());
     request.setMax_per_host_thread_reservation(
         maxPerHostPeakResources.getThreadReservation());
-    if (getAnalysisResult().isQueryStmt()) {
-      request.setDedicated_coord_mem_estimate(MathUtil.saturatingAdd(rootFragment
-          .getResourceProfile().getMemEstimateBytes(), totalRuntimeFilterMemBytes +
-          DEDICATED_COORD_SAFETY_BUFFER_BYTES));
-    } else {
-      // For queries that don't have a coordinator fragment, estimate a small
-      // amount of memory that the query state spwaned on the coordinator can use.
-      request.setDedicated_coord_mem_estimate(totalRuntimeFilterMemBytes +
-          DEDICATED_COORD_SAFETY_BUFFER_BYTES);
-    }
+    // Assuming the root fragment will always run on the coordinator backend, which
+    // might not be true for queries that don't have a coordinator fragment
+    // (request.getStmt_type() != TStmtType.QUERY). TODO: Fix in IMPALA-8791.
+    request.setDedicated_coord_mem_estimate(MathUtil.saturatingAdd(rootFragment
+        .getResourceProfile().getMemEstimateBytes(), totalRuntimeFilterMemBytes +
+        DEDICATED_COORD_SAFETY_BUFFER_BYTES));
     if (LOG.isTraceEnabled()) {
       LOG.trace("Max per-host min reservation: " +
           maxPerHostPeakResources.getMinMemReservationBytes());
diff --git a/testdata/workloads/functional-query/queries/QueryTest/dedicated-coord-mem-estimates.test b/testdata/workloads/functional-query/queries/QueryTest/dedicated-coord-mem-estimates.test
index 39b505f..cd53dac 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/dedicated-coord-mem-estimates.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/dedicated-coord-mem-estimates.test
@@ -4,8 +4,8 @@
 create table test as select id from functional.alltypes where id > 1
 ---- RUNTIME_PROFILE
 row_regex: .*Per-Host Resource Estimates: Memory=16MB.*
-row_regex: .*Dedicated Coordinator Resource Estimate: Memory=100MB.*
-row_regex: .*Cluster Memory Admitted: 132.00 MB.*
+row_regex: .*Dedicated Coordinator Resource Estimate: Memory=116MB.*
+row_regex: .*Cluster Memory Admitted: 32.00 MB.*
 ====
 ---- QUERY
 # Truncate table to run the following inserts.
@@ -24,8 +24,8 @@ row_regex: .*Cluster Memory Admitted: 10.00 MB.*
 insert into test select id from functional.alltypes where id > 3
 ---- RUNTIME_PROFILE
 row_regex: .*Per-Host Resource Estimates: Memory=16MB.*
-row_regex: .*Dedicated Coordinator Resource Estimate: Memory=100MB.*
-row_regex: .*Cluster Memory Admitted: 132.00 MB.*
+row_regex: .*Dedicated Coordinator Resource Estimate: Memory=116MB.*
+row_regex: .*Cluster Memory Admitted: 32.00 MB.*
 ====
 ---- QUERY
 # SELECT with merging exchange (i.e. order by).


[impala] 01/03: [DOCS] Put impala_date doc in the right place in the alphabetical order

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 6cde6d3b3a7d639fcabb19fb00940bd0cd444563
Author: Alex Rodoni <ar...@cloudera.com>
AuthorDate: Mon Aug 12 15:22:14 2019 -0700

    [DOCS] Put impala_date doc in the right place in the alphabetical order
    
    Change-Id: Ic89535c8948f5476782371f8e7f9848ee2b1a14d
    Reviewed-on: http://gerrit.cloudera.org:8080/14051
    Reviewed-by: Alex Rodoni <ar...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 docs/impala.ditamap | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/impala.ditamap b/docs/impala.ditamap
index 2479d72..6a44ac6 100644
--- a/docs/impala.ditamap
+++ b/docs/impala.ditamap
@@ -93,8 +93,8 @@ under the License.
       <topicref href="topics/impala_bigint.xml"/>
       <topicref href="topics/impala_boolean.xml"/>
       <topicref href="topics/impala_char.xml"/>
-      <topicref href="topics/impala_decimal.xml"/>
       <topicref href="topics/impala_date.xml"/>
+      <topicref href="topics/impala_decimal.xml"/>
       <topicref href="topics/impala_double.xml"/>
       <topicref href="topics/impala_float.xml"/>
       <topicref href="topics/impala_int.xml"/>


[impala] 02/03: IMPALA-8766: Undo hadoop-cloud-storage + HWX Nexus

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 8094811b5d975e18e20071552f86c2e3f8c0fc8f
Author: Joe McDonnell <jo...@cloudera.com>
AuthorDate: Wed Jul 17 11:51:32 2019 -0700

    IMPALA-8766: Undo hadoop-cloud-storage + HWX Nexus
    
    Previous commits for IMPALA-8766 attempted to use hadoop-cloud-storage
    to satisfy Impala's cloud dependencies (e.g. hadoop-aws, hadoop-azure,
    etc). On builds with USE_CDP_HIVE=true, this adds Knox
    gateway-cloud-bindings. However, the entry for hadoop-cloud-storage
    artifact in the impala.cdp.repo maven repository introduces
    dependencies that are external to that repository. This requires the
    HWX Nexus repository to resolve those dangling dependencies.
    Unfortunately, HWX Nexus ages out old jars, including the ones we
    need.
    
    This stops using hadoop-cloud-storage, and instead adds a direct
    dependency to Knox for USE_CDP_HIVE=true. It disables the HWX Nexus
    repository and leaves a tombstone explaining why.
    
    Testing:
     - Deleted my .m2 directory and rebuilt Impala with USE_CDP_HIVE=true
     - Verified the CLASSPATH still contains the right jars on USE_CDP_HIVE=true
    
    Change-Id: I79a0c2575fc50bbc3b393c150c0bce22258ea1bd
    Reviewed-on: http://gerrit.cloudera.org:8080/14024
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Vihang Karajgaonkar <vi...@cloudera.com>
---
 bin/impala-config.sh  |  1 +
 fe/pom.xml            | 67 ++++++++++++++++++++++-----------------------------
 impala-parent/pom.xml | 29 +++++++++++++---------
 3 files changed, 48 insertions(+), 49 deletions(-)

diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index effddc4..d795748 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -208,6 +208,7 @@ if $USE_CDP_HIVE; then
   # the minicluster
   export IMPALA_HIVE_VERSION=${CDP_HIVE_VERSION}
   export IMPALA_TEZ_VERSION=0.9.1.7.0.0.0-365
+  export IMPALA_KNOX_VERSION=1.0.0.7.0.0.0-365
   export IMPALA_HADOOP_VERSION=${CDP_HADOOP_VERSION}
   export HADOOP_HOME="$CDP_COMPONENTS_HOME/hadoop-${CDP_HADOOP_VERSION}/"
 else
diff --git a/fe/pom.xml b/fe/pom.xml
index f7e0d35..5162b7a 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -91,6 +91,31 @@ under the License.
     </dependency>
 
     <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-aws</artifactId>
+      <version>${hadoop.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-azure</artifactId>
+      <version>${hadoop.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-azure-datalake</artifactId>
+      <version>${hadoop.version}</version>
+      <exclusions>
+        <!-- https://issues.apache.org/jira/browse/HADOOP-14903 -->
+        <exclusion>
+          <groupId>net.minidev</groupId>
+          <artifactId>json-smart</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.ranger</groupId>
       <artifactId>ranger-plugins-common</artifactId>
       <version>${ranger.version}</version>
@@ -925,34 +950,6 @@ under the License.
             </exclusion>
           </exclusions>
         </dependency>
-
-        <!-- CDH profile manually specifies cloud storage dependencies rather
-             than using hadoop-cloud-storage. This is to minimize any disruption
-             to older configurations. -->
-        <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-aws</artifactId>
-          <version>${hadoop.version}</version>
-        </dependency>
-
-        <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-azure</artifactId>
-          <version>${hadoop.version}</version>
-        </dependency>
-
-        <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-azure-datalake</artifactId>
-          <version>${hadoop.version}</version>
-          <exclusions>
-            <!-- https://issues.apache.org/jira/browse/HADOOP-14903 -->
-            <exclusion>
-              <groupId>net.minidev</groupId>
-              <artifactId>json-smart</artifactId>
-            </exclusion>
-          </exclusions>
-        </dependency>
       </dependencies>
     </profile>
 
@@ -1057,18 +1054,12 @@ under the License.
           <version>3.2.0-m3</version>
           <scope>test</scope>
         </dependency>
-        <!-- The hadoop-cloud-storage artifact gets AWS, Azure, and other cloud
-             storage dependencies. It also incorporates Knox runtime dependencies. -->
+        <!-- IMPALA-8766: Include Knox jars on the classpath -->
         <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-cloud-storage</artifactId>
-          <version>${hadoop.version}</version>
+          <groupId>org.apache.knox</groupId>
+          <artifactId>gateway-cloud-bindings</artifactId>
+          <version>${knox.version}</version>
           <exclusions>
-            <!-- https://issues.apache.org/jira/browse/HADOOP-14903 -->
-            <exclusion>
-              <groupId>net.minidev</groupId>
-              <artifactId>json-smart</artifactId>
-            </exclusion>
             <!-- Impala currently doesn't support GCS, so exclude those jars -->
             <exclusion>
               <groupId>com.google.cloud.bigdataoss</groupId>
diff --git a/impala-parent/pom.xml b/impala-parent/pom.xml
index 4dae057..ab1dbb3 100644
--- a/impala-parent/pom.xml
+++ b/impala-parent/pom.xml
@@ -40,6 +40,7 @@ under the License.
     <hbase.version>${env.IMPALA_HBASE_VERSION}</hbase.version>
     <parquet.version>${env.IMPALA_PARQUET_VERSION}</parquet.version>
     <kite.version>${env.IMPALA_KITE_VERSION}</kite.version>
+    <knox.version>${env.IMPALA_KNOX_VERSION}</knox.version>
     <thrift.version>0.9.3</thrift.version>
     <impala.extdatasrc.api.version>1.0-SNAPSHOT</impala.extdatasrc.api.version>
     <impala.query.event.hook.api.version>1.0-SNAPSHOT</impala.query.event.hook.api.version>
@@ -185,25 +186,31 @@ under the License.
     </repository>
     <repository>
       <!--
-      The impala.cdp.repo above can reference versions that are not in that
-      repository. For example, artifact A at version 280 may have a dependency
-      on artifact B at version 279, but the maven repository may only have
-      artifact B at version 280. This repository contains all the versions, so
-      it satisfies the dangling dependencies. This was necessary for IMPALA-8766.
+      HWX Nexus is disabled. This is a tombstone to list out why:
+      1. Snapshots are disabled because HWX Nexus contains snapshots of CDH artifacts
+      that can conflict with the artifacts in the maven repository associated with
+      the CDH_BUILD_NUMBER. Maven can end up downloading a mix of artifacts that are
+      mutually incompatible. Snapshots are not necessary at this time.
+      2.  In a previous change, we depended on the hadoop-cloud-storage artifact from
+      the impala.cdp.repo. This had the odd property that it referenced versions of
+      artifacts that were not in the impala.cdp.repo. For example, artifact A at
+      version 280 may have a dependency on artifact B at version 279, but the maven
+      repository may only have artifact B at version 280. Nexus was meant to handle
+      these dangling dependencies (see IMPALA-8766). However, HWX Nexus ages out jars
+      from old build numbers. When the artifact B at version 279 ages out, it breaks
+      the build. Just as seriously, if Impala uses the impala.cdp.repo in a way that
+      requires an external maven repository, old commits may become unbuildable if that
+      external maven repository removes any of the jars we need.
+      So, HWX Nexus is disabled and should face strong scrutiny before being reenabled.
       -->
       <id>hwx.public.repo</id>
       <url>https://nexus-private.hortonworks.com/nexus/content/groups/public</url>
       <name>Hortonworks public repository</name>
-      <!--
-      Snapshots are specifically disabled, because the snapshots in this repository
-      would conflict with the versions in impala.cdh.repo and should be unnecessary.
-      -->
       <snapshots>
         <enabled>false</enabled>
       </snapshots>
       <releases>
-        <!-- The Hortonworks public repository is only needed for USE_CDP_HIVE=true -->
-        <enabled>${env.USE_CDP_HIVE}</enabled>
+        <enabled>false</enabled>
       </releases>
     </repository>
   </repositories>