You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2020/08/03 04:30:30 UTC

[impala] branch master updated: IMPALA-10036: schedule unpartitioned fragments on random executor

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 521c814  IMPALA-10036: schedule unpartitioned fragments on random executor
521c814 is described below

commit 521c81466f980b95972026a7ac112cfa47a3f854
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Sat Aug 1 14:23:12 2020 -0700

    IMPALA-10036: schedule unpartitioned fragments on random executor
    
    Some plans have unpartitioned fragments that are not the coordinator
    fragment. It would be best to avoid scheduling these on a coordinator,
    both to reduce coordinator load and to allow using the coordinator
    mem estimates for these queries.
    
    Testing:
    Extended the dedicated coordinator admission control test with
    a query with an unpartitioned fragment. The test failed on master
    with dedicated coordinator estimates.
    
    Manually ran the repro with TPC-DS Q23, which is a more complex
    query with unpartitioned fragments, and confirmed that could
    run with low coordinator mem limits.
    
    Ran exhaustive tests.
    
    Change-Id: I1eee78f24b048968f2fdf91f0da95b78552f8d33
    Reviewed-on: http://gerrit.cloudera.org:8080/16272
    Reviewed-by: Tim Armstrong <ta...@cloudera.com>
    Tested-by: Tim Armstrong <ta...@cloudera.com>
---
 be/src/scheduling/schedule-state.cc               |  6 ++-
 be/src/scheduling/schedule-state.h                |  6 +++
 be/src/scheduling/scheduler-test-util.cc          |  9 ++--
 be/src/scheduling/scheduler.cc                    | 63 +++++++++++++++++------
 be/src/scheduling/scheduler.h                     |  6 ++-
 tests/custom_cluster/test_admission_controller.py | 59 +++++++++++++--------
 6 files changed, 104 insertions(+), 45 deletions(-)

diff --git a/be/src/scheduling/schedule-state.cc b/be/src/scheduling/schedule-state.cc
index d413915..e004f3a 100644
--- a/be/src/scheduling/schedule-state.cc
+++ b/be/src/scheduling/schedule-state.cc
@@ -73,7 +73,8 @@ ScheduleState::ScheduleState(const UniqueIdPB& query_id, const TQueryExecRequest
     query_options_(query_options),
     query_schedule_pb_(new QuerySchedulePB()),
     summary_profile_(summary_profile),
-    next_instance_id_(query_id) {
+    next_instance_id_(query_id),
+    rng_(rand()) {
   // Init() is not called, this constructor is for white box testing only.
   DCHECK(TestInfo::is_test());
 }
@@ -281,6 +282,9 @@ void ScheduleState::UpdateMemoryRequirements(const TPoolConfig& pool_cfg) {
     coord_backend_mem_to_admit = use_dedicated_coord_estimates ?
         GetDedicatedCoordMemoryEstimate() :
         GetPerExecutorMemoryEstimate();
+    VLOG(3) << "use_dedicated_coord_estimates=" << use_dedicated_coord_estimates
+            << " coord_backend_mem_to_admit=" << coord_backend_mem_to_admit
+            << " per_backend_mem_to_admit=" << per_backend_mem_to_admit;
     if (!mimic_old_behaviour) {
       int64_t min_mem_limit_required =
           ReservationUtil::GetMinMemLimitFromReservation(largest_min_reservation());
diff --git a/be/src/scheduling/schedule-state.h b/be/src/scheduling/schedule-state.h
index 9da8408..a011d94 100644
--- a/be/src/scheduling/schedule-state.h
+++ b/be/src/scheduling/schedule-state.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include <map>
+#include <random>
 #include <string>
 #include <unordered_map>
 #include <vector>
@@ -294,6 +295,8 @@ class ScheduleState {
 
   void set_executor_group(string executor_group);
 
+  std::mt19937* rng() { return &rng_; }
+
  private:
   /// These references are valid for the lifetime of this query schedule because they
   /// are all owned by the enclosing QueryExecState.
@@ -339,6 +342,9 @@ class ScheduleState {
   /// Scheduler and only valid after scheduling completes successfully.
   std::string executor_group_;
 
+  /// Random number generated used for any randomized decisions during scheduling.
+  std::mt19937 rng_;
+
   /// Map from fragment idx to references into the 'request_'.
   std::unordered_map<int32_t, const TPlanFragment&> fragments_;
 
diff --git a/be/src/scheduling/scheduler-test-util.cc b/be/src/scheduling/scheduler-test-util.cc
index 2974081..8f0ea1e 100644
--- a/be/src/scheduling/scheduler-test-util.cc
+++ b/be/src/scheduling/scheduler-test-util.cc
@@ -17,6 +17,8 @@
 
 #include "scheduling/scheduler-test-util.h"
 
+#include <random>
+
 #include <boost/unordered_set.hpp>
 
 #include "common/names.h"
@@ -647,9 +649,10 @@ Status SchedulerWrapper::Compute(bool exec_at_coord, Result* result) {
   DCHECK(membership_snapshot->local_be_desc.get() != nullptr);
   Scheduler::ExecutorConfig executor_config =
       {no_executor_group ? empty_group : it->second, *membership_snapshot->local_be_desc};
-  return scheduler_->ComputeScanRangeAssignment(executor_config, 0,
-      nullptr, false, *locations, plan_.referenced_datanodes(), exec_at_coord,
-      plan_.query_options(), nullptr, assignment);
+  std::mt19937 rng(rand());
+  return scheduler_->ComputeScanRangeAssignment(executor_config, 0, nullptr, false,
+      *locations, plan_.referenced_datanodes(), exec_at_coord, plan_.query_options(),
+      nullptr, &rng, assignment);
 }
 
 void SchedulerWrapper::AddBackend(const Host& host) {
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 3f55c88..abd28ec 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -194,7 +194,7 @@ Status Scheduler::ComputeScanRangeAssignment(
       RETURN_IF_ERROR(
           ComputeScanRangeAssignment(executor_config, node_id, node_replica_preference,
               node_random_replica, *locations, exec_request.host_list, exec_at_coord,
-              state->query_options(), total_assignment_timer, assignment));
+              state->query_options(), total_assignment_timer, state->rng(), assignment));
       state->IncNumScanRanges(locations->size());
     }
   }
@@ -275,20 +275,51 @@ void Scheduler::ComputeFragmentExecParams(const ExecutorConfig& executor_config,
             << fragment_state->fragment.display_name;
     CreateCollocatedJoinBuildInstances(fragment_state, state);
   } else if (fragment.partition.type == TPartitionType::UNPARTITIONED) {
-    // case 1: root fragment instance executed at coordinator
-    VLOG(3) << "Computing exec params for coordinator fragment "
-            << fragment_state->fragment.display_name;
-    const BackendDescriptorPB& local_be_desc = executor_config.local_be_desc;
-    const NetworkAddressPB& coord = local_be_desc.address();
-    DCHECK(local_be_desc.has_krpc_address());
-    const NetworkAddressPB& krpc_coord = local_be_desc.krpc_address();
-    DCHECK(IsResolvedAddress(krpc_coord));
+    // case 1: unpartitioned fragment - either the coordinator fragment at the root of
+    // the plan that must be executed at the coordinator or an unpartitioned fragment
+    // that can be executed anywhere.
+    VLOG(3) << "Computing exec params for "
+            << (fragment_state->is_coord_fragment ? "coordinator" : "unpartitioned")
+            << " fragment " << fragment_state->fragment.display_name;
+    NetworkAddressPB host;
+    NetworkAddressPB krpc_host;
+    if (fragment_state->is_coord_fragment || executor_config.group.NumExecutors() == 0) {
+      // The coordinator fragment must be scheduled on the coordinator. Otherwise if
+      // no executors are available, we need to schedule on the coordinator.
+      const BackendDescriptorPB& local_be_desc = executor_config.local_be_desc;
+      host = local_be_desc.address();
+      DCHECK(local_be_desc.has_krpc_address());
+      krpc_host = local_be_desc.krpc_address();
+    } else if (fragment_state->exchange_input_fragments.size() > 0) {
+      // Interior unpartitioned fragments can be scheduled on an arbitrary executor.
+      // Pick a random instance from the first input fragment.
+      const FragmentScheduleState& input_fragment_state =
+          *state->GetFragmentScheduleState(fragment_state->exchange_input_fragments[0]);
+      int num_input_instances = input_fragment_state.instance_states.size();
+      int instance_idx =
+          std::uniform_int_distribution<int>(0, num_input_instances - 1)(*state->rng());
+      host = input_fragment_state.instance_states[instance_idx].host;
+      krpc_host = input_fragment_state.instance_states[instance_idx].krpc_host;
+    } else {
+      // Other fragments, e.g. ones with only a constant union or empty set, are scheduled
+      // on a random executor.
+      vector<BackendDescriptorPB> all_executors =
+          executor_config.group.GetAllExecutorDescriptors();
+      int idx = std::uniform_int_distribution<int>(0, all_executors.size() - 1)(
+          *state->rng());
+      const BackendDescriptorPB& be_desc = all_executors[idx];
+      host = be_desc.address();
+      DCHECK(be_desc.has_krpc_address());
+      krpc_host = be_desc.krpc_address();
+    }
+    VLOG(3) << "Scheduled unpartitioned fragment on " << krpc_host;
+    DCHECK(IsResolvedAddress(krpc_host));
     // make sure that the coordinator instance ends up with instance idx 0
     UniqueIdPB instance_id = fragment_state->is_coord_fragment ?
         state->query_id() :
         state->GetNextInstanceId();
     fragment_state->instance_states.emplace_back(
-        instance_id, coord, krpc_coord, 0, *fragment_state);
+        instance_id, host, krpc_host, 0, *fragment_state);
     FInstanceScheduleState& instance_state = fragment_state->instance_states.back();
     *fragment_state->exec_params->add_instances() = instance_id;
 
@@ -524,7 +555,7 @@ Status Scheduler::ComputeScanRangeAssignment(const ExecutorConfig& executor_conf
     PlanNodeId node_id, const TReplicaPreference::type* node_replica_preference,
     bool node_random_replica, const vector<TScanRangeLocationList>& locations,
     const vector<TNetworkAddress>& host_list, bool exec_at_coord,
-    const TQueryOptions& query_options, RuntimeProfile::Counter* timer,
+    const TQueryOptions& query_options, RuntimeProfile::Counter* timer, std::mt19937* rng,
     FragmentScanRangeAssignment* assignment) {
   const ExecutorGroup& executor_group = executor_config.group;
   if (executor_group.NumExecutors() == 0 && !exec_at_coord) {
@@ -557,9 +588,8 @@ Status Scheduler::ComputeScanRangeAssignment(const ExecutorConfig& executor_conf
   const BackendDescriptorPB& local_be_desc = executor_config.local_be_desc;
   coord_only_executor_group.AddExecutor(local_be_desc);
   VLOG_ROW << "Exec at coord is " << (exec_at_coord ? "true" : "false");
-  AssignmentCtx assignment_ctx(
-      exec_at_coord ? coord_only_executor_group : executor_group, total_assignments_,
-      total_local_assignments_);
+  AssignmentCtx assignment_ctx(exec_at_coord ? coord_only_executor_group : executor_group,
+      total_assignments_, total_local_assignments_, rng);
 
   // Holds scan ranges that must be assigned for remote reads.
   vector<const TScanRangeLocationList*> remote_scan_range_locations;
@@ -863,15 +893,14 @@ void Scheduler::ComputeBackendExecParams(
 }
 
 Scheduler::AssignmentCtx::AssignmentCtx(const ExecutorGroup& executor_group,
-    IntCounter* total_assignments, IntCounter* total_local_assignments)
+    IntCounter* total_assignments, IntCounter* total_local_assignments, std::mt19937* rng)
   : executor_group_(executor_group),
     first_unused_executor_idx_(0),
     total_assignments_(total_assignments),
     total_local_assignments_(total_local_assignments) {
   DCHECK_GT(executor_group.NumExecutors(), 0);
   random_executor_order_ = executor_group.GetAllExecutorIps();
-  std::mt19937 g(rand());
-  std::shuffle(random_executor_order_.begin(), random_executor_order_.end(), g);
+  std::shuffle(random_executor_order_.begin(), random_executor_order_.end(), *rng);
   // Initialize inverted map for executor rank lookups
   int i = 0;
   for (const IpAddr& ip : random_executor_order_) random_executor_rank_[ip] = i++;
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index 4a9be5d..d9abb03 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <random>
 #include <string>
 #include <vector>
 #include <boost/heap/binomial_heap.hpp>
@@ -151,7 +152,7 @@ class Scheduler {
   class AssignmentCtx {
    public:
     AssignmentCtx(const ExecutorGroup& executor_group, IntCounter* total_assignments,
-        IntCounter* total_local_assignments);
+        IntCounter* total_local_assignments, std::mt19937* rng);
 
     /// Among hosts in 'data_locations', select the one with the minimum number of
     /// assigned bytes. If executors have been assigned equal amounts of work and
@@ -344,13 +345,14 @@ class Scheduler {
   /// exec_at_coord:           Whether to schedule all scan ranges on the coordinator.
   /// query_options:           Query options for the current query.
   /// timer:                   Tracks execution time of ComputeScanRangeAssignment.
+  /// rng:                     Random number generated used for any random decisions
   /// assignment:              Output parameter, to which new assignments will be added.
   Status ComputeScanRangeAssignment(const ExecutorConfig& executor_config,
       PlanNodeId node_id, const TReplicaPreference::type* node_replica_preference,
       bool node_random_replica, const std::vector<TScanRangeLocationList>& locations,
       const std::vector<TNetworkAddress>& host_list, bool exec_at_coord,
       const TQueryOptions& query_options, RuntimeProfile::Counter* timer,
-      FragmentScanRangeAssignment* assignment);
+      std::mt19937* rng, FragmentScanRangeAssignment* assignment);
 
   /// Computes execution parameters for all backends assigned in the query and always one
   /// for the coordinator backend since it participates in execution regardless. Must be
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index d463913..a84862b 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -64,6 +64,15 @@ LOG = logging.getLogger('admission_test')
 # that running queries can be correlated with the thread that submitted them.
 QUERY = " union all ".join(["select * from functional.alltypesagg where id != {0}"] * 30)
 
+# Same query but with additional unpartitioned non-coordinator fragments.
+# The unpartitioned fragments are both interior fragments that consume input
+# from a scan fragment and non-interior fragments with a constant UNION.
+QUERY_WITH_UNPARTITIONED_FRAGMENTS = """
+    select *, (select count(distinct int_col) from alltypestiny) subquery1,
+           (select count(distinct int_col) from alltypes) subquery2,
+           (select 1234) subquery3
+    from (""" + QUERY + """) v"""
+
 # The statestore heartbeat and topic update frequency (ms). Set low for testing.
 STATESTORE_RPC_FREQUENCY_MS = 100
 
@@ -575,28 +584,34 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     'using_dedicated_coord_estimates' is true."""
     self.client.set_configuration_option('request_pool', "root.regularPool")
     ImpalaTestSuite.change_database(self.client, vector.get_value('table_format'))
-    handle = self.client.execute_async(QUERY.format(1))
-    self.client.wait_for_finished_timeout(handle, 1000)
-    expected_mem_limits = self.__get_mem_limits_admission_debug_page()
-    actual_mem_limits = self.__get_mem_limits_memz_debug_page(handle.get_handle().id)
-    mem_admitted = get_mem_admitted_backends_debug_page(self.cluster)
-    debug_string = " expected_mem_limits:" + str(
-      expected_mem_limits) + " actual_mem_limits:" + str(
-      actual_mem_limits) + " mem_admitted:" + str(mem_admitted)
-    MB = 1 << 20
-    # Easiest way to check float in-equality.
-    assert abs(expected_mem_limits['coordinator'] - expected_mem_limits[
-      'executor']) > 0.0001 or not using_dedicated_coord_estimates, debug_string
-    # There may be some rounding errors so keep a margin of 5MB when verifying
-    assert abs(actual_mem_limits['coordinator'] - expected_mem_limits[
-      'coordinator']) < 5 * MB, debug_string
-    assert abs(actual_mem_limits['executor'] - expected_mem_limits[
-      'executor']) < 5 * MB, debug_string
-    assert abs(mem_admitted['coordinator'] - expected_mem_limits[
-      'coordinator']) < 5 * MB, debug_string
-    assert abs(
-      mem_admitted['executor'][0] - expected_mem_limits['executor']) < 5 * MB, \
-        debug_string
+    # Use a test query that has unpartitioned non-coordinator fragments to make
+    # sure those are handled correctly (IMPALA-10036).
+    for query in [QUERY, QUERY_WITH_UNPARTITIONED_FRAGMENTS]:
+      handle = self.client.execute_async(query.format(1))
+      self.client.wait_for_finished_timeout(handle, 1000)
+      expected_mem_limits = self.__get_mem_limits_admission_debug_page()
+      actual_mem_limits = self.__get_mem_limits_memz_debug_page(handle.get_handle().id)
+      mem_admitted = get_mem_admitted_backends_debug_page(self.cluster)
+      debug_string = " expected_mem_limits:" + str(
+        expected_mem_limits) + " actual_mem_limits:" + str(
+        actual_mem_limits) + " mem_admitted:" + str(mem_admitted)
+      MB = 1 << 20
+      # Easiest way to check float in-equality.
+      assert abs(expected_mem_limits['coordinator'] - expected_mem_limits[
+        'executor']) > 0.0001 or not using_dedicated_coord_estimates, debug_string
+      # There may be some rounding errors so keep a margin of 5MB when verifying
+      assert abs(actual_mem_limits['coordinator'] - expected_mem_limits[
+        'coordinator']) < 5 * MB, debug_string
+      assert abs(actual_mem_limits['executor'] - expected_mem_limits[
+        'executor']) < 5 * MB, debug_string
+      assert abs(mem_admitted['coordinator'] - expected_mem_limits[
+        'coordinator']) < 5 * MB, debug_string
+      assert abs(
+        mem_admitted['executor'][0] - expected_mem_limits['executor']) < 5 * MB, \
+          debug_string
+      # Ensure all fragments finish executing before running next query.
+      self.client.fetch(query, handle)
+      self.client.close_query(handle)
 
   def __get_mem_limits_admission_debug_page(self):
     """Helper method assumes a 2 node cluster using a dedicated coordinator. Returns the