You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by wz...@apache.org on 2023/07/21 08:22:37 UTC

[impala] 02/03: IMPALA-12300: Turn CheckEffectiveInstanceCount to print warning

This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 0bee563073ef04b04446a2d1acf279ac4bc596e2
Author: Riza Suminto <ri...@cloudera.com>
AuthorDate: Wed Jul 19 15:25:17 2023 -0700

    IMPALA-12300: Turn CheckEffectiveInstanceCount to print warning
    
    Scheduler::CheckEffectiveInstanceCount was added to check consistency
    between FE planning and BE scheduling if COMPUTE_PROCESSING_COST=true.
    This consistency can be broken if there is a cluster membership
    change (new executor becomes online) between FE planning and BE
    scheduling. Say, in executor group size 10 with 90% health threshold,
    admission-controller is allowed to run a query when only 9 executor is
    available. If 10th executor is online during the time between FE
    planning and BE scheduling, CheckEffectiveInstanceCount can fail and
    return error.
    
    This patch turn two error status in CheckEffectiveInstanceCount into
    warning, either to query profile as InfoString or WARNING log.
    MAX_FRAGMENT_INSTANCES_PER_NODE violation check stays to return error.
    
    Testing:
    - Add test_75_percent_availability
    - Pass test_executors.py
    
    Change-Id: Ieaf6a46c4f12dbf8b03d1618c2f090ab4f2ac665
    Reviewed-on: http://gerrit.cloudera.org:8080/20231
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/scheduling/scheduler.cc               | 42 +++++++++++--------
 be/src/scheduling/scheduler.h                |  2 +-
 tests/custom_cluster/test_executor_groups.py | 61 ++++++++++++++++++++++++++--
 3 files changed, 83 insertions(+), 22 deletions(-)

diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 64e8f9ce5..d5ebed49d 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -61,6 +61,7 @@ namespace impala {
 static const string LOCAL_ASSIGNMENTS_KEY("simple-scheduler.local-assignments.total");
 static const string ASSIGNMENTS_KEY("simple-scheduler.assignments.total");
 static const string SCHEDULER_INIT_KEY("simple-scheduler.initialized");
+static const string SCHEDULER_WARNING_KEY("Scheduler Warning");
 
 static const vector<TPlanNodeType::type> SCAN_NODE_TYPES{TPlanNodeType::HDFS_SCAN_NODE,
     TPlanNodeType::HBASE_SCAN_NODE, TPlanNodeType::DATA_SOURCE_NODE,
@@ -267,17 +268,23 @@ Status Scheduler::ComputeFragmentExecParams(
 }
 
 Status Scheduler::CheckEffectiveInstanceCount(
-    const FragmentScheduleState* fragment_state, const ScheduleState* state) {
+    const FragmentScheduleState* fragment_state, ScheduleState* state) {
   // These checks are only intended if COMPUTE_PROCESSING_COST=true.
   if (!state->query_options().compute_processing_cost) return Status::OK();
 
   int effective_instance_count = fragment_state->fragment.effective_instance_count;
   if (effective_instance_count < fragment_state->instance_states.size()) {
-    return Status(
-        Substitute("$0 scheduled $1 instances, higher than the effective count ($2). "
-                   "Consider running the query with COMPUTE_PROCESSING_COST=false.",
-            fragment_state->fragment.display_name, fragment_state->instance_states.size(),
-            effective_instance_count));
+    if (state->summary_profile()->GetInfoString(SCHEDULER_WARNING_KEY) == nullptr) {
+      state->summary_profile()->AddInfoString(SCHEDULER_WARNING_KEY,
+          "Cluster membership might changed between planning and scheduling");
+    }
+
+    string warn_message = Substitute(
+        "$0 scheduled instance count ($1) is higher than its effective count ($2)",
+        fragment_state->fragment.display_name, fragment_state->instance_states.size(),
+        effective_instance_count);
+    state->summary_profile()->AppendInfoString(SCHEDULER_WARNING_KEY, warn_message);
+    LOG(WARNING) << warn_message;
   }
 
   DCHECK(!fragment_state->instance_states.empty());
@@ -308,24 +315,23 @@ Status Scheduler::CheckEffectiveInstanceCount(
   QueryConstants qc;
   if (largest_inst_per_host > qc.MAX_FRAGMENT_INSTANCES_PER_NODE) {
     return Status(Substitute(
-        "$0 scheduled $1 instances, higher than maximum instances per node ($2). "
-        "Consider running the query with COMPUTE_PROCESSING_COST=false.",
+        "$0 scheduled instance count ($1) is higher than maximum instances per node"
+        " ($2), indicating a planner bug. Consider running the query with"
+        " COMPUTE_PROCESSING_COST=false.",
         fragment_state->fragment.display_name, largest_inst_per_host,
         qc.MAX_FRAGMENT_INSTANCES_PER_NODE));
   }
 
   int planned_inst_per_host = ceil((float)effective_instance_count / num_host);
   if (largest_inst_per_host > planned_inst_per_host) {
-    stringstream err_msg;
-    err_msg << fragment_state->fragment.display_name
-            << " has imbalance number of instance to host assignment."
-            << " Consider running the query with COMPUTE_PROCESSING_COST=false."
-            << " Host " << fragment_state->instance_states[largest_inst_idx].host
-            << " has " << largest_inst_per_host << " instances assigned."
-            << " effective_instance_count=" << effective_instance_count
-            << " planned_inst_per_host=" << planned_inst_per_host
-            << " num_host=" << num_host;
-    return Status(err_msg.str());
+    LOG(WARNING) << fragment_state->fragment.display_name
+                 << " has imbalance number of instance to host assignment."
+                 << " Consider running the query with COMPUTE_PROCESSING_COST=false."
+                 << " Host " << fragment_state->instance_states[largest_inst_idx].host
+                 << " has " << largest_inst_per_host << " instances assigned."
+                 << " effective_instance_count=" << effective_instance_count
+                 << " planned_inst_per_host=" << planned_inst_per_host
+                 << " num_host=" << num_host;
   }
   return Status::OK();
 }
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index 600a74851..ee82b4997 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -444,7 +444,7 @@ class Scheduler {
   /// instance_states size match with effective_instance_count. Fragment with UnionNode or
   /// ScanNode or one where IsExceedMaxFsWriters equals true is not checked.
   static Status CheckEffectiveInstanceCount(
-      const FragmentScheduleState* fragment_state, const ScheduleState* state);
+      const FragmentScheduleState* fragment_state, ScheduleState* state);
 
   /// Check if sink_fragment_state has hdfs_table_sink AND ref_fragment_state scheduled
   /// to exceed max_fs_writers query option.
diff --git a/tests/custom_cluster/test_executor_groups.py b/tests/custom_cluster/test_executor_groups.py
index dd8693d47..42c900b47 100644
--- a/tests/custom_cluster/test_executor_groups.py
+++ b/tests/custom_cluster/test_executor_groups.py
@@ -826,7 +826,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
     # The path to resources directory which contains the admission control config files.
     RESOURCES_DIR = os.path.join(os.environ['IMPALA_HOME'], "fe", "src", "test",
                                  "resources")
-    # Define two group sets: tiny, small and large
+    # Define three group sets: tiny, small and large
     fs_allocation_path = os.path.join(RESOURCES_DIR, "fair-scheduler-3-groups.xml")
     # Define the min-query-mem-limit, max-query-mem-limit,
     # max-query-cpu-core-per-node-limit and max-query-cpu-core-coordinator-limit
@@ -844,8 +844,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
         "-llama_site_path %s "
         "%s ")
 
-    # Start with a regular admission config, multiple pools, no resource limits,
-    # and query_cpu_count_divisor=2.
+    # Start with a regular admission config, multiple pools, no resource limits.
     self._restart_coordinators(num_coordinators=1,
         extra_args=extra_args_template % (fs_allocation_path, llama_site_path,
           coordinator_test_args))
@@ -1384,3 +1383,59 @@ class TestExecutorGroups(CustomClusterTestSuite):
         QUERY, {'request_pool': 'queue1'}, "Executor Group: root.queue1-group2")
     self.client.close()
     second_coord_client.close()
+
+  @pytest.mark.execute_serially
+  def test_75_percent_availability(self):
+    """Test query planning and execution when only 75% of executor is up.
+    This test will run query over 8 node executor group at its healthy threshold (6) and
+    start the other 2 executor after query is planned.
+    """
+    coordinator_test_args = ''
+    # The path to resources directory which contains the admission control config files.
+    RESOURCES_DIR = os.path.join(os.environ['IMPALA_HOME'], "fe", "src", "test",
+                                 "resources")
+
+    # Reuse cluster configuration from _setup_three_exec_group_cluster, but only start
+    # root.large executor groups.
+    fs_allocation_path = os.path.join(RESOURCES_DIR, "fair-scheduler-3-groups.xml")
+    llama_site_path = os.path.join(RESOURCES_DIR, "llama-site-3-groups.xml")
+
+    # extra args template to start coordinator
+    extra_args_template = ("-vmodule admission-controller=3 "
+        "-admission_control_slots=8 "
+        "-expected_executor_group_sets=root.large:8 "
+        "-fair_scheduler_allocation_path %s "
+        "-llama_site_path %s "
+        "%s ")
+
+    # Start with a regular admission config, multiple pools, no resource limits.
+    self._restart_coordinators(num_coordinators=1,
+        extra_args=extra_args_template % (fs_allocation_path, llama_site_path,
+          coordinator_test_args))
+
+    # Create fresh client
+    self.create_impala_clients()
+    # Start root.large exec group with 8 admission slots and 6 executors.
+    self._add_executor_group("group", 6, num_executors=6, admission_control_slots=8,
+                             resource_pool="root.large", extra_args="-mem_limit=2g")
+    assert self._get_num_executor_groups(only_healthy=False) == 1
+    assert self._get_num_executor_groups(only_healthy=False,
+                                         exec_group_set_prefix="root.large") == 1
+
+    # Run query and let it compile, but delay admission for 5s
+    handle = self.execute_query_async(CPU_TEST_QUERY, {
+      "COMPUTE_PROCESSING_COST": "true",
+      "DEBUG_ACTION": "AC_BEFORE_ADMISSION:SLEEP@5000"})
+
+    # Start the next 2 executors.
+    self._add_executors("group", 6, num_executors=2, resource_pool="root.large",
+        extra_args="-mem_limit=2g", expected_num_impalads=9)
+
+    self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 60)
+    profile = self.client.get_runtime_profile(handle)
+    assert "F00:PLAN FRAGMENT [RANDOM] hosts=6 instances=12" in profile, profile
+    assert ("Scheduler Warning: Cluster membership might changed between planning and "
+        "scheduling, F00 scheduled instance count (16) is higher than its effective "
+        "count (12)") in profile, profile
+    assert "00:SCAN HDFS               8     16" in profile, profile
+    self.client.close_query(handle)