You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2020/08/03 04:30:30 UTC
[impala] branch master updated: IMPALA-10036: schedule
unpartitioned fragments on random executor
This is an automated email from the ASF dual-hosted git repository.
tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 521c814 IMPALA-10036: schedule unpartitioned fragments on random executor
521c814 is described below
commit 521c81466f980b95972026a7ac112cfa47a3f854
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Sat Aug 1 14:23:12 2020 -0700
IMPALA-10036: schedule unpartitioned fragments on random executor
Some plans have unpartitioned fragments that are not the coordinator
fragment. It would be best to avoid scheduling these on a coordinator,
both to reduce coordinator load and to allow using the coordinator
mem estimates for these queries.
Testing:
Extended the dedicated coordinator admission control test with
a query with an unpartitioned fragment. The test failed on master
with dedicated coordinator estimates.
Manually ran the repro with TPC-DS Q23, which is a more complex
query with unpartitioned fragments, and confirmed that could
run with low coordinator mem limits.
Ran exhaustive tests.
Change-Id: I1eee78f24b048968f2fdf91f0da95b78552f8d33
Reviewed-on: http://gerrit.cloudera.org:8080/16272
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Tim Armstrong <ta...@cloudera.com>
---
be/src/scheduling/schedule-state.cc | 6 ++-
be/src/scheduling/schedule-state.h | 6 +++
be/src/scheduling/scheduler-test-util.cc | 9 ++--
be/src/scheduling/scheduler.cc | 63 +++++++++++++++++------
be/src/scheduling/scheduler.h | 6 ++-
tests/custom_cluster/test_admission_controller.py | 59 +++++++++++++--------
6 files changed, 104 insertions(+), 45 deletions(-)
diff --git a/be/src/scheduling/schedule-state.cc b/be/src/scheduling/schedule-state.cc
index d413915..e004f3a 100644
--- a/be/src/scheduling/schedule-state.cc
+++ b/be/src/scheduling/schedule-state.cc
@@ -73,7 +73,8 @@ ScheduleState::ScheduleState(const UniqueIdPB& query_id, const TQueryExecRequest
query_options_(query_options),
query_schedule_pb_(new QuerySchedulePB()),
summary_profile_(summary_profile),
- next_instance_id_(query_id) {
+ next_instance_id_(query_id),
+ rng_(rand()) {
// Init() is not called, this constructor is for white box testing only.
DCHECK(TestInfo::is_test());
}
@@ -281,6 +282,9 @@ void ScheduleState::UpdateMemoryRequirements(const TPoolConfig& pool_cfg) {
coord_backend_mem_to_admit = use_dedicated_coord_estimates ?
GetDedicatedCoordMemoryEstimate() :
GetPerExecutorMemoryEstimate();
+ VLOG(3) << "use_dedicated_coord_estimates=" << use_dedicated_coord_estimates
+ << " coord_backend_mem_to_admit=" << coord_backend_mem_to_admit
+ << " per_backend_mem_to_admit=" << per_backend_mem_to_admit;
if (!mimic_old_behaviour) {
int64_t min_mem_limit_required =
ReservationUtil::GetMinMemLimitFromReservation(largest_min_reservation());
diff --git a/be/src/scheduling/schedule-state.h b/be/src/scheduling/schedule-state.h
index 9da8408..a011d94 100644
--- a/be/src/scheduling/schedule-state.h
+++ b/be/src/scheduling/schedule-state.h
@@ -18,6 +18,7 @@
#pragma once
#include <map>
+#include <random>
#include <string>
#include <unordered_map>
#include <vector>
@@ -294,6 +295,8 @@ class ScheduleState {
void set_executor_group(string executor_group);
+ std::mt19937* rng() { return &rng_; }
+
private:
/// These references are valid for the lifetime of this query schedule because they
/// are all owned by the enclosing QueryExecState.
@@ -339,6 +342,9 @@ class ScheduleState {
/// Scheduler and only valid after scheduling completes successfully.
std::string executor_group_;
+ /// Random number generated used for any randomized decisions during scheduling.
+ std::mt19937 rng_;
+
/// Map from fragment idx to references into the 'request_'.
std::unordered_map<int32_t, const TPlanFragment&> fragments_;
diff --git a/be/src/scheduling/scheduler-test-util.cc b/be/src/scheduling/scheduler-test-util.cc
index 2974081..8f0ea1e 100644
--- a/be/src/scheduling/scheduler-test-util.cc
+++ b/be/src/scheduling/scheduler-test-util.cc
@@ -17,6 +17,8 @@
#include "scheduling/scheduler-test-util.h"
+#include <random>
+
#include <boost/unordered_set.hpp>
#include "common/names.h"
@@ -647,9 +649,10 @@ Status SchedulerWrapper::Compute(bool exec_at_coord, Result* result) {
DCHECK(membership_snapshot->local_be_desc.get() != nullptr);
Scheduler::ExecutorConfig executor_config =
{no_executor_group ? empty_group : it->second, *membership_snapshot->local_be_desc};
- return scheduler_->ComputeScanRangeAssignment(executor_config, 0,
- nullptr, false, *locations, plan_.referenced_datanodes(), exec_at_coord,
- plan_.query_options(), nullptr, assignment);
+ std::mt19937 rng(rand());
+ return scheduler_->ComputeScanRangeAssignment(executor_config, 0, nullptr, false,
+ *locations, plan_.referenced_datanodes(), exec_at_coord, plan_.query_options(),
+ nullptr, &rng, assignment);
}
void SchedulerWrapper::AddBackend(const Host& host) {
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 3f55c88..abd28ec 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -194,7 +194,7 @@ Status Scheduler::ComputeScanRangeAssignment(
RETURN_IF_ERROR(
ComputeScanRangeAssignment(executor_config, node_id, node_replica_preference,
node_random_replica, *locations, exec_request.host_list, exec_at_coord,
- state->query_options(), total_assignment_timer, assignment));
+ state->query_options(), total_assignment_timer, state->rng(), assignment));
state->IncNumScanRanges(locations->size());
}
}
@@ -275,20 +275,51 @@ void Scheduler::ComputeFragmentExecParams(const ExecutorConfig& executor_config,
<< fragment_state->fragment.display_name;
CreateCollocatedJoinBuildInstances(fragment_state, state);
} else if (fragment.partition.type == TPartitionType::UNPARTITIONED) {
- // case 1: root fragment instance executed at coordinator
- VLOG(3) << "Computing exec params for coordinator fragment "
- << fragment_state->fragment.display_name;
- const BackendDescriptorPB& local_be_desc = executor_config.local_be_desc;
- const NetworkAddressPB& coord = local_be_desc.address();
- DCHECK(local_be_desc.has_krpc_address());
- const NetworkAddressPB& krpc_coord = local_be_desc.krpc_address();
- DCHECK(IsResolvedAddress(krpc_coord));
+ // case 1: unpartitioned fragment - either the coordinator fragment at the root of
+ // the plan that must be executed at the coordinator or an unpartitioned fragment
+ // that can be executed anywhere.
+ VLOG(3) << "Computing exec params for "
+ << (fragment_state->is_coord_fragment ? "coordinator" : "unpartitioned")
+ << " fragment " << fragment_state->fragment.display_name;
+ NetworkAddressPB host;
+ NetworkAddressPB krpc_host;
+ if (fragment_state->is_coord_fragment || executor_config.group.NumExecutors() == 0) {
+ // The coordinator fragment must be scheduled on the coordinator. Otherwise if
+ // no executors are available, we need to schedule on the coordinator.
+ const BackendDescriptorPB& local_be_desc = executor_config.local_be_desc;
+ host = local_be_desc.address();
+ DCHECK(local_be_desc.has_krpc_address());
+ krpc_host = local_be_desc.krpc_address();
+ } else if (fragment_state->exchange_input_fragments.size() > 0) {
+ // Interior unpartitioned fragments can be scheduled on an arbitrary executor.
+ // Pick a random instance from the first input fragment.
+ const FragmentScheduleState& input_fragment_state =
+ *state->GetFragmentScheduleState(fragment_state->exchange_input_fragments[0]);
+ int num_input_instances = input_fragment_state.instance_states.size();
+ int instance_idx =
+ std::uniform_int_distribution<int>(0, num_input_instances - 1)(*state->rng());
+ host = input_fragment_state.instance_states[instance_idx].host;
+ krpc_host = input_fragment_state.instance_states[instance_idx].krpc_host;
+ } else {
+ // Other fragments, e.g. ones with only a constant union or empty set, are scheduled
+ // on a random executor.
+ vector<BackendDescriptorPB> all_executors =
+ executor_config.group.GetAllExecutorDescriptors();
+ int idx = std::uniform_int_distribution<int>(0, all_executors.size() - 1)(
+ *state->rng());
+ const BackendDescriptorPB& be_desc = all_executors[idx];
+ host = be_desc.address();
+ DCHECK(be_desc.has_krpc_address());
+ krpc_host = be_desc.krpc_address();
+ }
+ VLOG(3) << "Scheduled unpartitioned fragment on " << krpc_host;
+ DCHECK(IsResolvedAddress(krpc_host));
// make sure that the coordinator instance ends up with instance idx 0
UniqueIdPB instance_id = fragment_state->is_coord_fragment ?
state->query_id() :
state->GetNextInstanceId();
fragment_state->instance_states.emplace_back(
- instance_id, coord, krpc_coord, 0, *fragment_state);
+ instance_id, host, krpc_host, 0, *fragment_state);
FInstanceScheduleState& instance_state = fragment_state->instance_states.back();
*fragment_state->exec_params->add_instances() = instance_id;
@@ -524,7 +555,7 @@ Status Scheduler::ComputeScanRangeAssignment(const ExecutorConfig& executor_conf
PlanNodeId node_id, const TReplicaPreference::type* node_replica_preference,
bool node_random_replica, const vector<TScanRangeLocationList>& locations,
const vector<TNetworkAddress>& host_list, bool exec_at_coord,
- const TQueryOptions& query_options, RuntimeProfile::Counter* timer,
+ const TQueryOptions& query_options, RuntimeProfile::Counter* timer, std::mt19937* rng,
FragmentScanRangeAssignment* assignment) {
const ExecutorGroup& executor_group = executor_config.group;
if (executor_group.NumExecutors() == 0 && !exec_at_coord) {
@@ -557,9 +588,8 @@ Status Scheduler::ComputeScanRangeAssignment(const ExecutorConfig& executor_conf
const BackendDescriptorPB& local_be_desc = executor_config.local_be_desc;
coord_only_executor_group.AddExecutor(local_be_desc);
VLOG_ROW << "Exec at coord is " << (exec_at_coord ? "true" : "false");
- AssignmentCtx assignment_ctx(
- exec_at_coord ? coord_only_executor_group : executor_group, total_assignments_,
- total_local_assignments_);
+ AssignmentCtx assignment_ctx(exec_at_coord ? coord_only_executor_group : executor_group,
+ total_assignments_, total_local_assignments_, rng);
// Holds scan ranges that must be assigned for remote reads.
vector<const TScanRangeLocationList*> remote_scan_range_locations;
@@ -863,15 +893,14 @@ void Scheduler::ComputeBackendExecParams(
}
Scheduler::AssignmentCtx::AssignmentCtx(const ExecutorGroup& executor_group,
- IntCounter* total_assignments, IntCounter* total_local_assignments)
+ IntCounter* total_assignments, IntCounter* total_local_assignments, std::mt19937* rng)
: executor_group_(executor_group),
first_unused_executor_idx_(0),
total_assignments_(total_assignments),
total_local_assignments_(total_local_assignments) {
DCHECK_GT(executor_group.NumExecutors(), 0);
random_executor_order_ = executor_group.GetAllExecutorIps();
- std::mt19937 g(rand());
- std::shuffle(random_executor_order_.begin(), random_executor_order_.end(), g);
+ std::shuffle(random_executor_order_.begin(), random_executor_order_.end(), *rng);
// Initialize inverted map for executor rank lookups
int i = 0;
for (const IpAddr& ip : random_executor_order_) random_executor_rank_[ip] = i++;
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index 4a9be5d..d9abb03 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -17,6 +17,7 @@
#pragma once
+#include <random>
#include <string>
#include <vector>
#include <boost/heap/binomial_heap.hpp>
@@ -151,7 +152,7 @@ class Scheduler {
class AssignmentCtx {
public:
AssignmentCtx(const ExecutorGroup& executor_group, IntCounter* total_assignments,
- IntCounter* total_local_assignments);
+ IntCounter* total_local_assignments, std::mt19937* rng);
/// Among hosts in 'data_locations', select the one with the minimum number of
/// assigned bytes. If executors have been assigned equal amounts of work and
@@ -344,13 +345,14 @@ class Scheduler {
/// exec_at_coord: Whether to schedule all scan ranges on the coordinator.
/// query_options: Query options for the current query.
/// timer: Tracks execution time of ComputeScanRangeAssignment.
+ /// rng: Random number generated used for any random decisions
/// assignment: Output parameter, to which new assignments will be added.
Status ComputeScanRangeAssignment(const ExecutorConfig& executor_config,
PlanNodeId node_id, const TReplicaPreference::type* node_replica_preference,
bool node_random_replica, const std::vector<TScanRangeLocationList>& locations,
const std::vector<TNetworkAddress>& host_list, bool exec_at_coord,
const TQueryOptions& query_options, RuntimeProfile::Counter* timer,
- FragmentScanRangeAssignment* assignment);
+ std::mt19937* rng, FragmentScanRangeAssignment* assignment);
/// Computes execution parameters for all backends assigned in the query and always one
/// for the coordinator backend since it participates in execution regardless. Must be
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index d463913..a84862b 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -64,6 +64,15 @@ LOG = logging.getLogger('admission_test')
# that running queries can be correlated with the thread that submitted them.
QUERY = " union all ".join(["select * from functional.alltypesagg where id != {0}"] * 30)
+# Same query but with additional unpartitioned non-coordinator fragments.
+# The unpartitioned fragments are both interior fragments that consume input
+# from a scan fragment and non-interior fragments with a constant UNION.
+QUERY_WITH_UNPARTITIONED_FRAGMENTS = """
+ select *, (select count(distinct int_col) from alltypestiny) subquery1,
+ (select count(distinct int_col) from alltypes) subquery2,
+ (select 1234) subquery3
+ from (""" + QUERY + """) v"""
+
# The statestore heartbeat and topic update frequency (ms). Set low for testing.
STATESTORE_RPC_FREQUENCY_MS = 100
@@ -575,28 +584,34 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
'using_dedicated_coord_estimates' is true."""
self.client.set_configuration_option('request_pool', "root.regularPool")
ImpalaTestSuite.change_database(self.client, vector.get_value('table_format'))
- handle = self.client.execute_async(QUERY.format(1))
- self.client.wait_for_finished_timeout(handle, 1000)
- expected_mem_limits = self.__get_mem_limits_admission_debug_page()
- actual_mem_limits = self.__get_mem_limits_memz_debug_page(handle.get_handle().id)
- mem_admitted = get_mem_admitted_backends_debug_page(self.cluster)
- debug_string = " expected_mem_limits:" + str(
- expected_mem_limits) + " actual_mem_limits:" + str(
- actual_mem_limits) + " mem_admitted:" + str(mem_admitted)
- MB = 1 << 20
- # Easiest way to check float in-equality.
- assert abs(expected_mem_limits['coordinator'] - expected_mem_limits[
- 'executor']) > 0.0001 or not using_dedicated_coord_estimates, debug_string
- # There may be some rounding errors so keep a margin of 5MB when verifying
- assert abs(actual_mem_limits['coordinator'] - expected_mem_limits[
- 'coordinator']) < 5 * MB, debug_string
- assert abs(actual_mem_limits['executor'] - expected_mem_limits[
- 'executor']) < 5 * MB, debug_string
- assert abs(mem_admitted['coordinator'] - expected_mem_limits[
- 'coordinator']) < 5 * MB, debug_string
- assert abs(
- mem_admitted['executor'][0] - expected_mem_limits['executor']) < 5 * MB, \
- debug_string
+ # Use a test query that has unpartitioned non-coordinator fragments to make
+ # sure those are handled correctly (IMPALA-10036).
+ for query in [QUERY, QUERY_WITH_UNPARTITIONED_FRAGMENTS]:
+ handle = self.client.execute_async(query.format(1))
+ self.client.wait_for_finished_timeout(handle, 1000)
+ expected_mem_limits = self.__get_mem_limits_admission_debug_page()
+ actual_mem_limits = self.__get_mem_limits_memz_debug_page(handle.get_handle().id)
+ mem_admitted = get_mem_admitted_backends_debug_page(self.cluster)
+ debug_string = " expected_mem_limits:" + str(
+ expected_mem_limits) + " actual_mem_limits:" + str(
+ actual_mem_limits) + " mem_admitted:" + str(mem_admitted)
+ MB = 1 << 20
+ # Easiest way to check float in-equality.
+ assert abs(expected_mem_limits['coordinator'] - expected_mem_limits[
+ 'executor']) > 0.0001 or not using_dedicated_coord_estimates, debug_string
+ # There may be some rounding errors so keep a margin of 5MB when verifying
+ assert abs(actual_mem_limits['coordinator'] - expected_mem_limits[
+ 'coordinator']) < 5 * MB, debug_string
+ assert abs(actual_mem_limits['executor'] - expected_mem_limits[
+ 'executor']) < 5 * MB, debug_string
+ assert abs(mem_admitted['coordinator'] - expected_mem_limits[
+ 'coordinator']) < 5 * MB, debug_string
+ assert abs(
+ mem_admitted['executor'][0] - expected_mem_limits['executor']) < 5 * MB, \
+ debug_string
+ # Ensure all fragments finish executing before running next query.
+ self.client.fetch(query, handle)
+ self.client.close_query(handle)
def __get_mem_limits_admission_debug_page(self):
"""Helper method assumes a 2 node cluster using a dedicated coordinator. Returns the