You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/08/13 03:12:03 UTC
[impala] 03/03: Revert "IMPALA-8791: Handle the case where there is
no fragment scheduled on"
This is an automated email from the ASF dual-hosted git repository.
tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 2df3b8cf82af66199f5851c84f3aa065577f6d7d
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Mon Aug 12 17:57:32 2019 -0700
Revert "IMPALA-8791: Handle the case where there is no fragment scheduled on"
This reverts commit 760169edcbca438c5964380a604b6c271c6bd1a3.
Change-Id: Id20cf3581995f450de6f491e7874cbcf23b52cda
Reviewed-on: http://gerrit.cloudera.org:8080/14052
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Tim Armstrong <ta...@cloudera.com>
---
be/src/runtime/coordinator-backend-state.cc | 15 +-----
be/src/runtime/coordinator-backend-state.h | 8 +--
be/src/runtime/coordinator.cc | 12 +++--
be/src/runtime/coordinator.h | 4 +-
be/src/scheduling/admission-controller-test.cc | 33 ++++--------
be/src/scheduling/admission-controller.cc | 61 ++++++++++++----------
be/src/scheduling/query-schedule.cc | 57 +++++++++-----------
be/src/scheduling/query-schedule.h | 43 ++++++++-------
be/src/scheduling/scheduler.cc | 15 +++---
be/src/scheduling/scheduler.h | 5 +-
.../java/org/apache/impala/planner/Planner.java | 16 +++---
.../QueryTest/dedicated-coord-mem-estimates.test | 8 +--
12 files changed, 123 insertions(+), 154 deletions(-)
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 60e67c1..028dd70 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -69,8 +69,8 @@ void Coordinator::BackendState::Init(
const BackendExecParams& exec_params, const vector<FragmentStats*>& fragment_stats,
RuntimeProfile* host_profile_parent, ObjectPool* obj_pool) {
backend_exec_params_ = &exec_params;
- host_ = backend_exec_params_->be_desc.address;
- krpc_host_ = backend_exec_params_->be_desc.krpc_address;
+ host_ = backend_exec_params_->instance_params[0]->host;
+ krpc_host_ = backend_exec_params_->instance_params[0]->krpc_host;
num_remaining_instances_ = backend_exec_params_->instance_params.size();
host_profile_ = RuntimeProfile::Create(obj_pool, TNetworkAddressToString(host_));
@@ -187,14 +187,6 @@ void Coordinator::BackendState::Exec(
last_report_time_ms_ = GenerateReportTimestamp();
exec_complete_barrier->Notify();
});
-
- // Don not issue an ExecQueryFInstances RPC if there are no fragment instances
- // scheduled to run on this backend.
- if (IsEmptyBackend()) {
- DCHECK(backend_exec_params_->is_coord_backend);
- return;
- }
-
std::unique_ptr<ControlServiceProxy> proxy;
Status get_proxy_status = ControlService::GetProxy(krpc_host_, host_.hostname, &proxy);
if (!get_proxy_status.ok()) {
@@ -357,7 +349,6 @@ bool Coordinator::BackendState::ApplyExecStatusReport(
const ReportExecStatusRequestPB& backend_exec_status,
const TRuntimeProfileForest& thrift_profiles, ExecSummary* exec_summary,
ProgressUpdater* scan_range_progress, DmlExecState* dml_exec_state) {
- DCHECK(!IsEmptyBackend());
// Hold the exec_summary's lock to avoid exposing it half-way through
// the update loop below.
lock_guard<SpinLock> l1(exec_summary->lock);
@@ -462,7 +453,6 @@ bool Coordinator::BackendState::ApplyExecStatusReport(
void Coordinator::BackendState::UpdateHostProfile(
const TRuntimeProfileTree& thrift_profile) {
- DCHECK(!IsEmptyBackend());
host_profile_->Update(thrift_profile);
}
@@ -544,7 +534,6 @@ bool Coordinator::BackendState::Cancel() {
}
void Coordinator::BackendState::PublishFilter(const TPublishFilterParams& rpc_params) {
- DCHECK(!IsEmptyBackend());
DCHECK(rpc_params.dst_query_id == query_id());
// If the backend is already done, it's not waiting for this filter, so we skip
// sending it in this case.
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index 4b0cfad..4c41c8c 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -52,9 +52,7 @@ class ExecSummary;
struct FInstanceExecParams;
/// This class manages all aspects of the execution of all fragment instances of a
-/// single query on a particular backend. For the coordinator backend its possible to have
-/// no fragment instances scheduled on it. In that case, no RPCs are issued and the
-/// Backend state transitions to 'done' state right after Exec() is called on it.
+/// single query on a particular backend.
/// Thread-safe unless pointed out otherwise.
class Coordinator::BackendState {
public:
@@ -74,7 +72,6 @@ class Coordinator::BackendState {
/// notifies on rpc_complete_barrier when the rpc completes. Success/failure is
/// communicated through GetStatus(). Uses filter_routing_table to remove filters
/// that weren't selected during its construction.
- /// No RPC is issued if there are no fragment instances scheduled on this backend.
/// The debug_options are applied to the appropriate TPlanFragmentInstanceCtxs, based
/// on their node_id/instance_idx.
void Exec(const DebugOptions& debug_options,
@@ -162,9 +159,6 @@ class Coordinator::BackendState {
/// Returns a timestamp using monotonic time for tracking arrival of status reports.
static int64_t GenerateReportTimestamp() { return MonotonicMillis(); }
- /// Returns True if there are no fragment instances scheduled on this backend.
- bool IsEmptyBackend() { return backend_exec_params_->instance_params.empty(); }
-
private:
/// Execution stats for a single fragment instance.
/// Not thread-safe.
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 4df15d9..bd4fb75 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -111,8 +111,14 @@ Status Coordinator::Exec() {
const string& str = Substitute("Query $0", PrintId(query_id()));
progress_.Init(str, schedule_.num_scan_ranges());
+ // If there is no coord fragment then pick the mem limit computed for an executor.
+ // TODO: IMPALA-8791: make sure minimal or no limit is imposed for cases where no
+ // fragments are scheduled to run on the coordinator backend.
+ int64_t coord_mem_limit = schedule_.requiresCoordinatorFragment() ?
+ schedule_.coord_backend_mem_limit() :
+ schedule_.per_backend_mem_limit();
query_state_ = ExecEnv::GetInstance()->query_exec_mgr()->CreateQueryState(
- query_ctx(), schedule_.coord_backend_mem_limit());
+ query_ctx(), coord_mem_limit);
filter_mem_tracker_ = query_state_->obj_pool()->Add(new MemTracker(
-1, "Runtime Filter (Coordinator)", query_state_->query_mem_tracker(), false));
@@ -405,10 +411,6 @@ Status Coordinator::FinishBackendStartup() {
max_latency_host = TNetworkAddressToString(backend_state->impalad_address());
}
latencies.Update(backend_state->rpc_latency());
- // Mark backend complete if no fragment instances were assigned to it.
- if (backend_state->IsEmptyBackend()) {
- backend_exec_complete_barrier_->Notify();
- }
}
query_profile_->AddInfoString(
"Backend startup latencies", latencies.ToHumanReadable());
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 72ab3af..9098dd0 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -482,9 +482,7 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
/// Helper for Exec(). Checks for errors encountered when starting backend execution,
/// using any non-OK status, if any, as the overall status. Returns the overall
- /// status. Also updates query_profile_ with the startup latency histogram and the
- /// backend_exec_complete_barrier_ if there is any backend which is already done (only
- /// possible at this point if no fragment instances were assigned to it).
+ /// status. Also updates query_profile_ with the startup latency histogram.
Status FinishBackendStartup() WARN_UNUSED_RESULT;
/// Build the filter routing table by iterating over all plan nodes and collecting the
diff --git a/be/src/scheduling/admission-controller-test.cc b/be/src/scheduling/admission-controller-test.cc
index 17c8457..a241525 100644
--- a/be/src/scheduling/admission-controller-test.cc
+++ b/be/src/scheduling/admission-controller-test.cc
@@ -97,10 +97,11 @@ class AdmissionControllerTest : public testing::Test {
TQueryOptions* query_options = pool_.Add(new TQueryOptions());
query_options->__set_mem_limit(mem_limit);
QuerySchedule* query_schedule = pool_.Add(new QuerySchedule(
- *query_id, *request, *query_options, profile));
+ *query_id, *request, *query_options, is_dedicated_coord, profile));
query_schedule->set_executor_group(ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME);
- SetHostsInQuerySchedule(*query_schedule, num_hosts, is_dedicated_coord);
query_schedule->UpdateMemoryRequirements(config);
+
+ SetHostsInQuerySchedule(*query_schedule, num_hosts);
return query_schedule;
}
@@ -112,24 +113,14 @@ class AdmissionControllerTest : public testing::Test {
}
/// Replace the per-backend hosts in the schedule with one having 'count' hosts.
- /// Note: no FInstanceExecParams are added so
- /// QuerySchedule::UseDedicatedCoordEstimates() would consider this schedule as not
- /// having anything scheduled on the backend which would result in always returning true
- /// if a dedicated coordinator backend exists.
void SetHostsInQuerySchedule(QuerySchedule& query_schedule, const int count,
- bool is_dedicated_coord, int64_t min_mem_reservation_bytes = 0,
- int64_t admit_mem_limit = 200L * MEGABYTE) {
+ int64_t min_mem_reservation_bytes = 0, int64_t admit_mem_limit = 200L * MEGABYTE) {
PerBackendExecParams* per_backend_exec_params = pool_.Add(new PerBackendExecParams());
for (int i = 0; i < count; ++i) {
BackendExecParams* backend_exec_params = pool_.Add(new BackendExecParams());
+ backend_exec_params->admit_mem_limit = admit_mem_limit;
backend_exec_params->min_mem_reservation_bytes = min_mem_reservation_bytes;
- backend_exec_params->be_desc.__set_admit_mem_limit(admit_mem_limit);
- backend_exec_params->be_desc.__set_is_executor(true);
- if (i == 0) {
- // Add first element as the coordinator.
- backend_exec_params->is_coord_backend = true;
- backend_exec_params->be_desc.__set_is_executor(!is_dedicated_coord);
- }
+ if (i == 0) backend_exec_params->contains_coord_fragment = true;
const string host_name = Substitute("host$0", i);
per_backend_exec_params->emplace(
MakeNetworkAddress(host_name, 25000), *backend_exec_params);
@@ -488,7 +479,7 @@ TEST_F(AdmissionControllerTest, QueryRejection) {
// Adjust the QuerySchedule to have minimum memory reservation of 45MB.
// This will be rejected immediately as minimum memory reservation is too high.
- SetHostsInQuerySchedule(*query_schedule, host_count, false, 45L * MEGABYTE);
+ SetHostsInQuerySchedule(*query_schedule, host_count, 45L * MEGABYTE);
string rejected_reserved_reason;
ASSERT_TRUE(admission_controller->RejectForSchedule(
*query_schedule, config_d, host_count, host_count, &rejected_reserved_reason));
@@ -808,19 +799,17 @@ TEST_F(AdmissionControllerTest, DedicatedCoordAdmissionChecks) {
PerBackendExecParams* per_backend_exec_params = pool_.Add(new PerBackendExecParams());
// Add coordinator backend.
BackendExecParams* coord_exec_params = pool_.Add(new BackendExecParams());
- coord_exec_params->is_coord_backend = true;
+ coord_exec_params->admit_mem_limit = 512 * MEGABYTE;
+ coord_exec_params->contains_coord_fragment = true;
coord_exec_params->thread_reservation = 1;
- coord_exec_params->be_desc.__set_admit_mem_limit(512 * MEGABYTE);
- coord_exec_params->be_desc.__set_is_executor(false);
const string coord_host_name = Substitute("host$0", 1);
TNetworkAddress coord_addr = MakeNetworkAddress(coord_host_name, 25000);
const string coord_host = TNetworkAddressToString(coord_addr);
per_backend_exec_params->emplace(coord_addr, *coord_exec_params);
// Add executor backend.
BackendExecParams* backend_exec_params = pool_.Add(new BackendExecParams());
+ backend_exec_params->admit_mem_limit = GIGABYTE;
backend_exec_params->thread_reservation = 1;
- backend_exec_params->be_desc.__set_admit_mem_limit(GIGABYTE);
- backend_exec_params->be_desc.__set_is_executor(true);
const string exec_host_name = Substitute("host$0", 2);
TNetworkAddress exec_addr = MakeNetworkAddress(exec_host_name, 25000);
const string exec_host = TNetworkAddressToString(exec_addr);
@@ -864,7 +853,7 @@ TEST_F(AdmissionControllerTest, DedicatedCoordAdmissionChecks) {
// Test 2: coord's admit_mem_limit < executor's admit_mem_limit. Query rejected because
// coord's admit_mem_limit is less than mem_to_admit on the coord.
// Re-using previous QuerySchedule object.
- coord_exec_params->be_desc.__set_admit_mem_limit(100 * MEGABYTE);
+ coord_exec_params->admit_mem_limit = 100 * MEGABYTE;
(*per_backend_exec_params)[coord_addr] = *coord_exec_params;
query_schedule->set_per_backend_exec_params(*per_backend_exec_params);
ASSERT_TRUE(admission_controller->RejectForSchedule(
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index 666659b..d31c50b 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -56,6 +56,8 @@ DEFINE_int64_hidden(admission_control_stale_topic_threshold_ms, 5 * 1000,
"capture most cases where the Impala daemon is disconnected from the statestore "
"or topic updates are seriously delayed.");
+DECLARE_bool(is_executor);
+
namespace impala {
const int64_t AdmissionController::PoolStats::HISTOGRAM_NUM_OF_BINS = 128;
@@ -396,7 +398,7 @@ void AdmissionController::UpdateHostStats(
int64_t per_backend_mem_to_admit = schedule.per_backend_mem_to_admit();
for (const auto& entry : schedule.per_backend_exec_params()) {
const TNetworkAddress& host_addr = entry.first;
- int64_t mem_to_admit = entry.second.is_coord_backend ?
+ int64_t mem_to_admit = entry.second.contains_coord_fragment ?
schedule.coord_backend_mem_to_admit() : per_backend_mem_to_admit;
if (!is_admitting) mem_to_admit *= -1;
const string host = TNetworkAddressToString(host_addr);
@@ -439,8 +441,9 @@ bool AdmissionController::CanAccommodateMaxInitialReservation(
const int64_t coord_min_reservation = schedule.coord_min_reservation();
return CanMemLimitAccommodateReservation(
executor_mem_limit, executor_min_reservation, mem_unavailable_reason)
- && CanMemLimitAccommodateReservation(
- coord_mem_limit, coord_min_reservation, mem_unavailable_reason);
+ && (!schedule.requiresCoordinatorFragment()
+ || CanMemLimitAccommodateReservation(
+ coord_mem_limit, coord_min_reservation, mem_unavailable_reason));
}
bool AdmissionController::HasAvailableMemResources(const QuerySchedule& schedule,
@@ -480,11 +483,11 @@ bool AdmissionController::HasAvailableMemResources(const QuerySchedule& schedule
for (const auto& entry : schedule.per_backend_exec_params()) {
const TNetworkAddress& host = entry.first;
const string host_id = TNetworkAddressToString(host);
- int64_t admit_mem_limit = entry.second.be_desc.admit_mem_limit;
+ int64_t admit_mem_limit = entry.second.admit_mem_limit;
const HostStats& host_stats = host_stats_[host_id];
int64_t mem_reserved = host_stats.mem_reserved;
int64_t mem_admitted = host_stats.mem_admitted;
- int64_t mem_to_admit = entry.second.is_coord_backend ?
+ int64_t mem_to_admit = entry.second.contains_coord_fragment ?
coord_mem_to_admit : executor_mem_to_admit;
VLOG_ROW << "Checking memory on host=" << host_id
<< " mem_reserved=" << PrintBytes(mem_reserved)
@@ -516,7 +519,7 @@ bool AdmissionController::HasAvailableSlot(const QuerySchedule& schedule,
for (const auto& entry : schedule.per_backend_exec_params()) {
const TNetworkAddress& host = entry.first;
const string host_id = TNetworkAddressToString(host);
- int64_t admit_num_queries_limit = entry.second.be_desc.admit_num_queries_limit;
+ int64_t admit_num_queries_limit = entry.second.admit_num_queries_limit;
int64_t num_admitted = host_stats_[host_id].num_admitted;
VLOG_ROW << "Checking available slot on host=" << host_id
<< " num_admitted=" << num_admitted << " needs=" << num_admitted + 1
@@ -637,21 +640,21 @@ bool AdmissionController::RejectForSchedule(const QuerySchedule& schedule,
nullptr, std::numeric_limits<int64_t>::max());
int64_t cluster_thread_reservation = 0;
for (const auto& e : schedule.per_backend_exec_params()) {
- const BackendExecParams& bp = e.second;
- cluster_min_mem_reservation_bytes += bp.min_mem_reservation_bytes;
- if (bp.min_mem_reservation_bytes > largest_min_mem_reservation.second) {
- largest_min_mem_reservation = make_pair(&e.first, bp.min_mem_reservation_bytes);
+ cluster_min_mem_reservation_bytes += e.second.min_mem_reservation_bytes;
+ if (e.second.min_mem_reservation_bytes > largest_min_mem_reservation.second) {
+ largest_min_mem_reservation =
+ make_pair(&e.first, e.second.min_mem_reservation_bytes);
}
- cluster_thread_reservation += bp.thread_reservation;
- if (bp.thread_reservation > max_thread_reservation.second) {
- max_thread_reservation = make_pair(&e.first, bp.thread_reservation);
+ cluster_thread_reservation += e.second.thread_reservation;
+ if (e.second.thread_reservation > max_thread_reservation.second) {
+ max_thread_reservation = make_pair(&e.first, e.second.thread_reservation);
}
- if (bp.is_coord_backend) {
+ if (e.second.contains_coord_fragment) {
coord_admit_mem_limit.first = &e.first;
- coord_admit_mem_limit.second = bp.be_desc.admit_mem_limit;
- } else if (bp.be_desc.admit_mem_limit < min_executor_admit_mem_limit.second) {
+ coord_admit_mem_limit.second = e.second.admit_mem_limit;
+ } else if (e.second.admit_mem_limit < min_executor_admit_mem_limit.second) {
min_executor_admit_mem_limit.first = &e.first;
- min_executor_admit_mem_limit.second = bp.be_desc.admit_mem_limit;
+ min_executor_admit_mem_limit.second = e.second.admit_mem_limit;
}
}
@@ -721,15 +724,17 @@ bool AdmissionController::RejectForSchedule(const QuerySchedule& schedule,
TNetworkAddressToString(*min_executor_admit_mem_limit.first));
return true;
}
- int64_t coord_mem_to_admit = schedule.coord_backend_mem_to_admit();
- VLOG_ROW << "Checking coordinator mem with coord_mem_to_admit = "
- << coord_mem_to_admit
- << " and coord_admit_mem_limit.second = " << coord_admit_mem_limit.second;
- if (coord_mem_to_admit > coord_admit_mem_limit.second) {
- *rejection_reason = Substitute(REASON_REQ_OVER_NODE_MEM,
- PrintBytes(coord_mem_to_admit), PrintBytes(coord_admit_mem_limit.second),
- TNetworkAddressToString(*coord_admit_mem_limit.first));
- return true;
+ if (schedule.requiresCoordinatorFragment()) {
+ int64_t coord_mem_to_admit = schedule.coord_backend_mem_to_admit();
+ VLOG_ROW << "Checking coordinator mem with coord_mem_to_admit = "
+ << coord_mem_to_admit
+ << " and coord_admit_mem_limit.second = " << coord_admit_mem_limit.second;
+ if (coord_mem_to_admit > coord_admit_mem_limit.second) {
+ *rejection_reason = Substitute(REASON_REQ_OVER_NODE_MEM,
+ PrintBytes(coord_mem_to_admit), PrintBytes(coord_admit_mem_limit.second),
+ TNetworkAddressToString(*coord_admit_mem_limit.first));
+ return true;
+ }
}
}
return false;
@@ -1143,9 +1148,11 @@ Status AdmissionController::ComputeGroupSchedules(
for (const ExecutorGroup* executor_group : executor_groups) {
DCHECK(executor_group->IsHealthy());
DCHECK_GT(executor_group->NumExecutors(), 0);
+ bool is_dedicated_coord = !FLAGS_is_executor;
unique_ptr<QuerySchedule> group_schedule =
make_unique<QuerySchedule>(request.query_id, request.request,
- request.query_options, request.summary_profile, request.query_events);
+ request.query_options, is_dedicated_coord, request.summary_profile,
+ request.query_events);
const string& group_name = executor_group->name();
VLOG(3) << "Scheduling for executor group: " << group_name << " with "
<< executor_group->NumExecutors() << " executors";
diff --git a/be/src/scheduling/query-schedule.cc b/be/src/scheduling/query-schedule.cc
index 86fe8fd..fceee52 100644
--- a/be/src/scheduling/query-schedule.cc
+++ b/be/src/scheduling/query-schedule.cc
@@ -43,26 +43,29 @@ DEFINE_bool_hidden(use_dedicated_coordinator_estimates, true,
namespace impala {
QuerySchedule::QuerySchedule(const TUniqueId& query_id, const TQueryExecRequest& request,
- const TQueryOptions& query_options, RuntimeProfile* summary_profile,
- RuntimeProfile::EventSequence* query_events)
+ const TQueryOptions& query_options, bool is_dedicated_coord,
+ RuntimeProfile* summary_profile, RuntimeProfile::EventSequence* query_events)
: query_id_(query_id),
request_(request),
query_options_(query_options),
summary_profile_(summary_profile),
query_events_(query_events),
num_scan_ranges_(0),
- next_instance_id_(query_id) {
+ next_instance_id_(query_id),
+ is_dedicated_coord_(is_dedicated_coord) {
Init();
}
QuerySchedule::QuerySchedule(const TUniqueId& query_id, const TQueryExecRequest& request,
- const TQueryOptions& query_options, RuntimeProfile* summary_profile)
+ const TQueryOptions& query_options, bool is_dedicated_coord,
+ RuntimeProfile* summary_profile)
: query_id_(query_id),
request_(request),
query_options_(query_options),
summary_profile_(summary_profile),
num_scan_ranges_(0),
- next_instance_id_(query_id) {
+ next_instance_id_(query_id),
+ is_dedicated_coord_(is_dedicated_coord) {
// Init() is not called, this constructor is for white box testing only.
DCHECK(TestInfo::is_test());
}
@@ -86,7 +89,7 @@ void QuerySchedule::Init() {
// mark coordinator fragment
const TPlanFragment& root_fragment = request_.plan_exec_info[0].fragments[0];
- if (RequiresCoordinatorFragment()) {
+ if (requiresCoordinatorFragment()) {
fragment_exec_params_[root_fragment.idx].is_coord_fragment = true;
// the coordinator instance gets index 0, generated instance ids start at 1
next_instance_id_ = CreateInstanceId(next_instance_id_, 1);
@@ -179,11 +182,7 @@ void QuerySchedule::Validate() const {
}
}
}
-
- for (const auto& elem: per_backend_exec_params_) {
- const BackendExecParams& bp = elem.second;
- DCHECK(!bp.instance_params.empty() || bp.is_coord_backend);
- }
+ // TODO: add validation for BackendExecParams
}
int64_t QuerySchedule::GetPerExecutorMemoryEstimate() const {
@@ -224,7 +223,7 @@ void QuerySchedule::GetTPlanFragments(vector<const TPlanFragment*>* fragments) c
}
const FInstanceExecParams& QuerySchedule::GetCoordInstanceExecParams() const {
- DCHECK(RequiresCoordinatorFragment());
+ DCHECK(requiresCoordinatorFragment());
const TPlanFragment& coord_fragment = request_.plan_exec_info[0].fragments[0];
const FragmentExecParams& fragment_params = fragment_exec_params_[coord_fragment.idx];
DCHECK_EQ(fragment_params.instance_exec_params.size(), 1);
@@ -248,25 +247,20 @@ int QuerySchedule::GetNumFragmentInstances() const {
}
int64_t QuerySchedule::GetClusterMemoryToAdmit() const {
- // There will always be an entry for the coordinator in per_backend_exec_params_.
- return per_backend_mem_to_admit() * (per_backend_exec_params_.size() - 1)
- + coord_backend_mem_to_admit();
+ if (!requiresCoordinatorFragment()) {
+ // For this case, there will be no coordinator fragment so only use the per
+ // executor mem to admit while accounting for admitted memory. This will also ensure
+ // the per backend mem admitted accounting is consistent with the cluster-wide mem
+ // admitted.
+ return per_backend_mem_to_admit() * per_backend_exec_params_.size();
+ } else {
+ return per_backend_mem_to_admit() * (per_backend_exec_params_.size() - 1)
+ + coord_backend_mem_to_admit();
+ }
}
bool QuerySchedule::UseDedicatedCoordEstimates() const {
- for (auto& itr : per_backend_exec_params_) {
- if (!itr.second.is_coord_backend) continue;
- auto& coord = itr.second;
- bool is_dedicated_coord = !coord.be_desc.is_executor;
- bool only_coord_fragment_scheduled =
- RequiresCoordinatorFragment() && coord.instance_params.size() == 1;
- bool no_fragment_scheduled = coord.instance_params.size() == 0;
- return FLAGS_use_dedicated_coordinator_estimates && is_dedicated_coord
- && (only_coord_fragment_scheduled || no_fragment_scheduled);
- }
- DCHECK(false)
- << "Coordinator backend should always have a entry in per_backend_exec_params_";
- return false;
+ return is_dedicated_coord_ && FLAGS_use_dedicated_coordinator_estimates;
}
void QuerySchedule::UpdateMemoryRequirements(const TPoolConfig& pool_cfg) {
@@ -275,7 +269,6 @@ void QuerySchedule::UpdateMemoryRequirements(const TPoolConfig& pool_cfg) {
// mem_limit if it is set in the query options, else sets it to -1 (no limit).
bool mimic_old_behaviour =
pool_cfg.min_query_mem_limit == 0 && pool_cfg.max_query_mem_limit == 0;
- bool use_dedicated_coord_estimates = UseDedicatedCoordEstimates();
per_backend_mem_to_admit_ = 0;
coord_backend_mem_to_admit_ = 0;
@@ -288,7 +281,7 @@ void QuerySchedule::UpdateMemoryRequirements(const TPoolConfig& pool_cfg) {
if (!has_query_option) {
per_backend_mem_to_admit_ = GetPerExecutorMemoryEstimate();
- coord_backend_mem_to_admit_ = use_dedicated_coord_estimates ?
+ coord_backend_mem_to_admit_ = UseDedicatedCoordEstimates() ?
GetDedicatedCoordMemoryEstimate() :
GetPerExecutorMemoryEstimate();
if (!mimic_old_behaviour) {
@@ -306,7 +299,7 @@ void QuerySchedule::UpdateMemoryRequirements(const TPoolConfig& pool_cfg) {
if (pool_cfg.min_query_mem_limit > 0) {
per_backend_mem_to_admit_ =
max(per_backend_mem_to_admit_, pool_cfg.min_query_mem_limit);
- if (!use_dedicated_coord_estimates || has_query_option) {
+ if (!UseDedicatedCoordEstimates() || has_query_option) {
// The minimum mem limit option does not apply to dedicated coordinators -
// this would result in over-reserving of memory. Treat coordinator and
// executor mem limits the same if the query option was explicitly set.
@@ -328,7 +321,7 @@ void QuerySchedule::UpdateMemoryRequirements(const TPoolConfig& pool_cfg) {
coord_backend_mem_to_admit_ = min(coord_backend_mem_to_admit_, MemInfo::physical_mem());
// If the query is only scheduled to run on the coordinator.
- if (per_backend_exec_params_.size() == 1 && RequiresCoordinatorFragment()) {
+ if (per_backend_exec_params_.size() == 1 && requiresCoordinatorFragment()) {
per_backend_mem_to_admit_ = 0;
}
diff --git a/be/src/scheduling/query-schedule.h b/be/src/scheduling/query-schedule.h
index 6725774..ad169c7 100644
--- a/be/src/scheduling/query-schedule.h
+++ b/be/src/scheduling/query-schedule.h
@@ -45,9 +45,7 @@ typedef std::map<TPlanNodeId, std::vector<TScanRangeParams>> PerNodeScanRanges;
typedef std::unordered_map<TNetworkAddress, PerNodeScanRanges>
FragmentScanRangeAssignment;
-/// Execution parameters for a single backend. This gets created for every backend that
-/// participates in query execution, which includes, every backend that has fragments
-/// scheduled on it and the coordinator backend. Computed by Scheduler::Schedule(), set
+/// Execution parameters for a single backend. Computed by Scheduler::Schedule(), set
/// via QuerySchedule::set_per_backend_exec_params(). Used as an input to
/// AdmissionController and a BackendState.
struct BackendExecParams {
@@ -56,8 +54,6 @@ struct BackendExecParams {
/// The fragment instance params assigned to this backend. All instances of a
/// particular fragment are contiguous in this vector. Query lifetime;
/// FInstanceExecParams are owned by QuerySchedule::fragment_exec_params_.
- /// This can be empty only for the coordinator backend, that is, if 'is_coord_backend'
- /// is true.
std::vector<const FInstanceExecParams*> instance_params;
// The minimum query-wide buffer reservation size (in bytes) required for this backend.
@@ -78,8 +74,16 @@ struct BackendExecParams {
// concurrently-executing fragment instances at any point in query execution.
int64_t thread_reservation = 0;
- // Indicates whether this backend is the coordinator.
- bool is_coord_backend = false;
+ // The maximum bytes of memory that can be admitted to this backend by the
+ // admission controller. Obtained from the scheduler's executors configuration
+ // which is updated by membership updates from the statestore.
+ int64_t admit_mem_limit = 0;
+
+ // The maximum number of queries that this backend can execute concurrently.
+ int64_t admit_num_queries_limit = 0;
+
+ // Indicates whether this backend will run the coordinator fragment.
+ bool contains_coord_fragment = false;
};
/// Map from an impalad host address to the list of assigned fragment instance params.
@@ -158,17 +162,18 @@ struct FragmentExecParams {
class QuerySchedule {
public:
QuerySchedule(const TUniqueId& query_id, const TQueryExecRequest& request,
- const TQueryOptions& query_options, RuntimeProfile* summary_profile,
+ const TQueryOptions& query_options, bool is_dedicated_coord,
+ RuntimeProfile* summary_profile,
RuntimeProfile::EventSequence* query_events);
/// For testing only: build a QuerySchedule object but do not run Init().
QuerySchedule(const TUniqueId& query_id, const TQueryExecRequest& request,
- const TQueryOptions& query_options, RuntimeProfile* summary_profile);
+ const TQueryOptions& query_options, bool is_dedicated_coord,
+ RuntimeProfile* summary_profile);
/// Verifies that the schedule is well-formed (and DCHECKs if it isn't):
/// - all fragments have a FragmentExecParams
/// - all scan ranges are assigned
- /// - all BackendExecParams have instances assigned except for coordinator.
void Validate() const;
const TUniqueId& query_id() const { return query_id_; }
@@ -282,11 +287,6 @@ class QuerySchedule {
/// Returns true if coordinator estimates calculated by the planner and specialized for
/// dedicated a coordinator are to be used for estimating memory requirements.
- /// This happens when the following conditions are true:
- /// 1. Coordinator fragment is scheduled on a dedicated coordinator
- /// 2. Either only the coordinator fragment or no fragments are scheduled on the
- /// coordinator backend. This essentially means that no executor fragments are scheduled
- /// on the coordinator backend.
bool UseDedicatedCoordEstimates() const;
/// Populates or updates the per host query memory limit and the amount of memory to be
@@ -299,6 +299,11 @@ class QuerySchedule {
void set_executor_group(string executor_group);
+ /// Returns true if a coordinator fragment is required based on the query stmt type.
+ bool requiresCoordinatorFragment() const {
+ return request_.stmt_type == TStmtType::QUERY;
+ }
+
private:
/// These references are valid for the lifetime of this query schedule because they
/// are all owned by the enclosing QueryExecState.
@@ -361,6 +366,9 @@ class QuerySchedule {
/// The coordinator's backend memory reservation. Set in Scheduler::Schedule().
int64_t coord_min_reservation_ = 0;
+ /// Indicates whether coordinator fragment will be running on a dedicated coordinator.
+ bool is_dedicated_coord_ = false;
+
/// The name of the executor group that this schedule was computed for. Set by the
/// Scheduler and only valid after scheduling completes successfully.
string executor_group_;
@@ -369,11 +377,6 @@ class QuerySchedule {
/// Sets is_coord_fragment and input_fragments.
/// Also populates plan_node_to_fragment_idx_ and plan_node_to_plan_node_idx_.
void Init();
-
- /// Returns true if a coordinator fragment is required based on the query stmt type.
- bool RequiresCoordinatorFragment() const {
- return request_.stmt_type == TStmtType::QUERY;
- }
};
}
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 467694b..b5e6807 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -673,22 +673,21 @@ void Scheduler::ComputeBackendExecParams(
be_params.initial_mem_reservation_total_claims +=
f.fragment.initial_mem_reservation_total_claims;
be_params.thread_reservation += f.fragment.thread_reservation;
+ if (f.is_coord_fragment) be_params.contains_coord_fragment = true;
}
}
- // This also ensures an entry always exists for the coordinator backend.
- int64_t coord_min_reservation = 0;
- const TNetworkAddress& coord_addr = executor_config.local_be_desc.address;
- BackendExecParams& coord_be_params = per_backend_params[coord_addr];
- coord_be_params.is_coord_backend = true;
- coord_min_reservation = coord_be_params.min_mem_reservation_bytes;
-
int64_t largest_min_reservation = 0;
+ int64_t coord_min_reservation = 0;
for (auto& backend : per_backend_params) {
const TNetworkAddress& host = backend.first;
const TBackendDescriptor be_desc = LookUpBackendDesc(executor_config, host);
+ backend.second.admit_mem_limit = be_desc.admit_mem_limit;
+ backend.second.admit_num_queries_limit = be_desc.admit_num_queries_limit;
backend.second.be_desc = be_desc;
- if (!backend.second.is_coord_backend) {
+ if (backend.second.contains_coord_fragment) {
+ coord_min_reservation = backend.second.min_mem_reservation_bytes;
+ } else {
largest_min_reservation =
max(largest_min_reservation, backend.second.min_mem_reservation_bytes);
}
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index 34ac10d..01ebb3f 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -343,9 +343,8 @@ class Scheduler {
const TQueryOptions& query_options, RuntimeProfile::Counter* timer,
FragmentScanRangeAssignment* assignment);
- /// Computes BackendExecParams for all backends assigned in the query and always one for
- /// the coordinator backend since it participates in execution regardless. Must be
- /// called after ComputeFragmentExecParams().
+ /// Computes BackendExecParams for all backends assigned in the query. Must be called
+ /// after ComputeFragmentExecParams().
void ComputeBackendExecParams(
const ExecutorConfig& executor_config, QuerySchedule* schedule);
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index dbcff87..b9d1f30 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -441,16 +441,12 @@ public class Planner {
maxPerHostPeakResources.getMinMemReservationBytes());
request.setMax_per_host_thread_reservation(
maxPerHostPeakResources.getThreadReservation());
- if (getAnalysisResult().isQueryStmt()) {
- request.setDedicated_coord_mem_estimate(MathUtil.saturatingAdd(rootFragment
- .getResourceProfile().getMemEstimateBytes(), totalRuntimeFilterMemBytes +
- DEDICATED_COORD_SAFETY_BUFFER_BYTES));
- } else {
- // For queries that don't have a coordinator fragment, estimate a small
- // amount of memory that the query state spwaned on the coordinator can use.
- request.setDedicated_coord_mem_estimate(totalRuntimeFilterMemBytes +
- DEDICATED_COORD_SAFETY_BUFFER_BYTES);
- }
+ // Assuming the root fragment will always run on the coordinator backend, which
+ // might not be true for queries that don't have a coordinator fragment
+ // (request.getStmt_type() != TStmtType.QUERY). TODO: Fix in IMPALA-8791.
+ request.setDedicated_coord_mem_estimate(MathUtil.saturatingAdd(rootFragment
+ .getResourceProfile().getMemEstimateBytes(), totalRuntimeFilterMemBytes +
+ DEDICATED_COORD_SAFETY_BUFFER_BYTES));
if (LOG.isTraceEnabled()) {
LOG.trace("Max per-host min reservation: " +
maxPerHostPeakResources.getMinMemReservationBytes());
diff --git a/testdata/workloads/functional-query/queries/QueryTest/dedicated-coord-mem-estimates.test b/testdata/workloads/functional-query/queries/QueryTest/dedicated-coord-mem-estimates.test
index 39b505f..cd53dac 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/dedicated-coord-mem-estimates.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/dedicated-coord-mem-estimates.test
@@ -4,8 +4,8 @@
create table test as select id from functional.alltypes where id > 1
---- RUNTIME_PROFILE
row_regex: .*Per-Host Resource Estimates: Memory=16MB.*
-row_regex: .*Dedicated Coordinator Resource Estimate: Memory=100MB.*
-row_regex: .*Cluster Memory Admitted: 132.00 MB.*
+row_regex: .*Dedicated Coordinator Resource Estimate: Memory=116MB.*
+row_regex: .*Cluster Memory Admitted: 32.00 MB.*
====
---- QUERY
# Truncate table to run the following inserts.
@@ -24,8 +24,8 @@ row_regex: .*Cluster Memory Admitted: 10.00 MB.*
insert into test select id from functional.alltypes where id > 3
---- RUNTIME_PROFILE
row_regex: .*Per-Host Resource Estimates: Memory=16MB.*
-row_regex: .*Dedicated Coordinator Resource Estimate: Memory=100MB.*
-row_regex: .*Cluster Memory Admitted: 132.00 MB.*
+row_regex: .*Dedicated Coordinator Resource Estimate: Memory=116MB.*
+row_regex: .*Cluster Memory Admitted: 32.00 MB.*
====
---- QUERY
# SELECT with merging exchange (i.e. order by).