You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2022/01/21 05:50:33 UTC

[impala] 02/02: IMPALA-11063: Add metrics to expose state of each executor group set

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 0aedf6021fa0e6ab4f93313c9dc7eae3b4860a46
Author: Bikramjeet Vig <bi...@gmail.com>
AuthorDate: Tue Jan 11 20:16:19 2022 -0800

    IMPALA-11063: Add metrics to expose state of each executor group set
    
    This adds metrics for each executor group set that expose the number
    of executor groups, the number of healthy executor groups and the
    total number of backends associated with that group set.
    
    Testing:
    Added an e2e test to verify metrics are updated correctly.
    
    Change-Id: Ib39f940de830ef6302785aee30eeb847fa5deeba
    Reviewed-on: http://gerrit.cloudera.org:8080/18142
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/scheduling/cluster-membership-mgr.cc  | 67 ++++++++++++++++++++---
 be/src/scheduling/cluster-membership-mgr.h   | 23 ++++++--
 common/thrift/metrics.json                   | 31 ++++++++++-
 tests/custom_cluster/test_executor_groups.py | 82 ++++++++++++++++++++++------
 4 files changed, 173 insertions(+), 30 deletions(-)

diff --git a/be/src/scheduling/cluster-membership-mgr.cc b/be/src/scheduling/cluster-membership-mgr.cc
index afb9e41..ff66317 100644
--- a/be/src/scheduling/cluster-membership-mgr.cc
+++ b/be/src/scheduling/cluster-membership-mgr.cc
@@ -60,6 +60,13 @@ static const string LIVE_EXEC_GROUP_KEY("cluster-membership.executor-groups.tota
 static const string HEALTHY_EXEC_GROUP_KEY(
     "cluster-membership.executor-groups.total-healthy");
 static const string TOTAL_BACKENDS_KEY("cluster-membership.backends.total");
+// Per group set metrics
+static const string LIVE_EXEC_GROUP_KEY_FORMAT(
+    "cluster-membership.group-set.executor-groups.total.$0");
+static const string HEALTHY_EXEC_GROUP_KEY_FORMAT(
+    "cluster-membership.group-set.executor-groups.total-healthy.$0");
+static const string TOTAL_BACKENDS_KEY_FORMAT(
+    "cluster-membership.group-set.backends.total.$0");
 
 ClusterMembershipMgr::ClusterMembershipMgr(
     string local_backend_id, StatestoreSubscriber* subscriber, MetricGroup* metrics)
@@ -71,16 +78,34 @@ ClusterMembershipMgr::ClusterMembershipMgr(
   if(!status.ok()) {
     LOG(FATAL) << "Error populating expected executor group sets: " << status;
   }
-  DCHECK(metrics != nullptr);
-  MetricGroup* metric_grp = metrics->GetOrCreateChildGroup("cluster-membership");
-  total_live_executor_groups_ = metric_grp->AddCounter(LIVE_EXEC_GROUP_KEY, 0);
-  total_healthy_executor_groups_ = metric_grp->AddCounter(HEALTHY_EXEC_GROUP_KEY, 0);
-  total_backends_ = metric_grp->AddCounter(TOTAL_BACKENDS_KEY, 0);
+  InitMetrics(metrics);
   // Register the metric update function as a callback.
   RegisterUpdateCallbackFn([this](
       ClusterMembershipMgr::SnapshotPtr snapshot) { this->UpdateMetrics(snapshot); });
 }
 
+void ClusterMembershipMgr::InitMetrics(MetricGroup* metrics) {
+  DCHECK(metrics != nullptr);
+  MetricGroup* metric_grp = metrics->GetOrCreateChildGroup("cluster-membership");
+  aggregated_group_set_metrics_.total_live_executor_groups_ =
+      metric_grp->AddCounter(LIVE_EXEC_GROUP_KEY, 0);
+  aggregated_group_set_metrics_.total_healthy_executor_groups_ =
+      metric_grp->AddCounter(HEALTHY_EXEC_GROUP_KEY, 0);
+  aggregated_group_set_metrics_.total_backends_ =
+      metric_grp->AddCounter(TOTAL_BACKENDS_KEY, 0);
+
+  for (auto& set : expected_exec_group_sets_) {
+    GroupSetMetrics grp_set_metrics;
+    grp_set_metrics.total_live_executor_groups_ =
+        metric_grp->AddCounter(LIVE_EXEC_GROUP_KEY_FORMAT, 0, set.exec_group_name_prefix);
+    grp_set_metrics.total_healthy_executor_groups_ = metric_grp->AddCounter(
+        HEALTHY_EXEC_GROUP_KEY_FORMAT, 0, set.exec_group_name_prefix);
+    grp_set_metrics.total_backends_ =
+        metric_grp->AddCounter(TOTAL_BACKENDS_KEY_FORMAT, 0, set.exec_group_name_prefix);
+    per_group_set_metrics_.insert({set.exec_group_name_prefix, grp_set_metrics});
+  }
+}
+
 Status ClusterMembershipMgr::Init() {
   LOG(INFO) << "Starting cluster membership manager";
   if (statestore_subscriber_ == nullptr) {
@@ -565,9 +590,35 @@ void ClusterMembershipMgr::UpdateMetrics(const SnapshotPtr& new_state){
     }
   }
   DCHECK_GE(total_live_executor_groups, healthy_executor_groups);
-  total_live_executor_groups_->SetValue(total_live_executor_groups);
-  total_healthy_executor_groups_->SetValue(healthy_executor_groups);
-  total_backends_->SetValue(new_state->current_backends.size());
+  aggregated_group_set_metrics_.total_live_executor_groups_->SetValue(
+      total_live_executor_groups);
+  aggregated_group_set_metrics_.total_healthy_executor_groups_->SetValue(
+      healthy_executor_groups);
+  aggregated_group_set_metrics_.total_backends_->SetValue(
+      new_state->current_backends.size());
+
+  for (auto& set : expected_exec_group_sets_) {
+    int total_backends = 0;
+    int64_t total_live_exec_groups = 0;
+    int64_t healthy_exec_groups = 0;
+    StringPiece prefix(set.exec_group_name_prefix);
+    for (const auto& group_it : new_state->executor_groups) {
+      StringPiece name(group_it.first);
+      if (!name.starts_with(prefix)) continue;
+      const ExecutorGroup& group = group_it.second;
+      if (group.IsHealthy()) {
+        ++healthy_exec_groups;
+      }
+      if (group.NumHosts() > 0) {
+        ++total_live_exec_groups;
+        total_backends += group.NumExecutors();
+      }
+    }
+    auto& grp_metrics = per_group_set_metrics_[set.exec_group_name_prefix];
+    grp_metrics.total_live_executor_groups_->SetValue(total_live_exec_groups);
+    grp_metrics.total_healthy_executor_groups_->SetValue(healthy_exec_groups);
+    grp_metrics.total_backends_->SetValue(total_backends);
+  }
 }
 
 bool ClusterMembershipMgr::IsBackendInExecutorGroups(
diff --git a/be/src/scheduling/cluster-membership-mgr.h b/be/src/scheduling/cluster-membership-mgr.h
index 12bfae4..7ce425c 100644
--- a/be/src/scheduling/cluster-membership-mgr.h
+++ b/be/src/scheduling/cluster-membership-mgr.h
@@ -176,6 +176,12 @@ class ClusterMembershipMgr {
   }
 
  private:
+  struct GroupSetMetrics {
+    IntCounter* total_live_executor_groups_ = nullptr;
+    IntCounter* total_healthy_executor_groups_ = nullptr;
+    IntCounter* total_backends_ = nullptr;
+  };
+
   /// Serializes and adds the local backend descriptor to 'subscriber_topic_updates'.
   void AddLocalBackendToStatestore(const BackendDescriptorPB& local_be_desc,
       std::vector<TTopicDelta>* subscriber_topic_updates);
@@ -206,7 +212,7 @@ class ClusterMembershipMgr {
   /// Updates the membership metrics. Is registered as an updated callback function to
   /// receive any membership updates. The only exception is that this is called directly
   /// in BlacklistExecutor() where updates are not required to be sent to external
-  /// listeners.
+  /// listeners. InitMetrics() must have been called before registering this method.
   void UpdateMetrics(const SnapshotPtr& new_state);
 
   /// Returns true if the 'be_desc' is in any of the 'executor_groups', false otherwise.
@@ -221,6 +227,10 @@ class ClusterMembershipMgr {
   static Status PopulateExpectedExecGroupSets(
       std::vector<TExecutorGroupSet>& expected_exec_group_sets);
 
+  /// Helper method to initialize metrics. Also initializes metrics for all group sets if
+  /// expected group sets are specified.
+  void InitMetrics(MetricGroup* metrics);
+
   /// An empty group used for scheduling coordinator only queries.
   const ExecutorGroup empty_exec_group_;
 
@@ -233,10 +243,13 @@ class ClusterMembershipMgr {
   /// locks in this class.
   std::mutex update_membership_lock_;
 
-  /// Membership metrics
-  IntCounter* total_live_executor_groups_ = nullptr;
-  IntCounter* total_healthy_executor_groups_ = nullptr;
-  IntCounter* total_backends_ = nullptr;
+  /// Membership metrics for all executor groups regardless of which group set they belong
+  /// to. The value for total backends also includes the coordinators.
+  GroupSetMetrics aggregated_group_set_metrics_;
+
+  /// Maps from the group set prefix to its group set metrics. Is empty if group sets are
+  /// not used.
+  std::unordered_map<string, GroupSetMetrics> per_group_set_metrics_;
 
   /// The snapshot of the current cluster membership. When receiving changes to the
   /// executors configuration from the statestore we will make a copy of the stored
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index 520bbe9..5780c83 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -2762,7 +2762,7 @@
     "kind": "COUNTER",
     "key": "cluster-membership.executor-groups.total-healthy"
   },{
-    "description": "Total number of backends registered with the statestore",
+    "description": "Total number of backends (both coordinators and executors) registered with the statestore",
     "contexts": [
       "IMPALAD"
     ],
@@ -2772,6 +2772,35 @@
     "key": "cluster-membership.backends.total"
   },
   {
+    "description": "Total number of executor groups in the executor group set $0 that have at least one executor",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Total number of executor groups in the executor group set $0 that have at least one executor",
+    "units": "NONE",
+    "kind": "COUNTER",
+    "key": "cluster-membership.group-set.executor-groups.total.$0"
+  },
+  {
+    "description": "Total number of executor groups in the executor group set $0 that are in a healthy state, that is, have at least the configured minimum number of executors to be considered for admission",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Total number of executor groups in the executor group set $0 that are in a healthy state",
+    "units": "NONE",
+    "kind": "COUNTER",
+    "key": "cluster-membership.group-set.executor-groups.total-healthy.$0"
+  },{
+    "description": "Total number of backends in the executor group set $0 registered with the statestore",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Total number of executors in the executor group set $0 registered with the statestore",
+    "units": "NONE",
+    "kind": "COUNTER",
+    "key": "cluster-membership.group-set.backends.total.$0"
+  },
+  {
   "description": "Total number of queries admitted on this coordinator running on executor group: $0",
   "contexts": [
     "IMPALAD"
diff --git a/tests/custom_cluster/test_executor_groups.py b/tests/custom_cluster/test_executor_groups.py
index 6baed97..a35ca8e 100644
--- a/tests/custom_cluster/test_executor_groups.py
+++ b/tests/custom_cluster/test_executor_groups.py
@@ -33,6 +33,7 @@ TEST_QUERY = "select count(*) from functional.alltypes where month + random() <
 
 DEFAULT_RESOURCE_POOL = "default-pool"
 
+
 class TestExecutorGroups(CustomClusterTestSuite):
   """This class contains tests that exercise the logic related to scaling clusters up and
   down by adding and removing groups of executors. All tests start with a base cluster
@@ -97,7 +98,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
                                expected_num_impalads=num_coordinators,
                                use_exclusive_coordinators=True)
     self.coordinator = self.cluster.impalads[0]
-    self.num_impalads = 2
+    self.num_impalads = num_coordinators
 
   def _get_total_admitted_queries(self):
     """Returns the total number of queries that have been admitted to the default resource
@@ -120,22 +121,32 @@ class TestExecutorGroups(CustomClusterTestSuite):
       return self.coordinator.service.wait_for_metric_value(
         "cluster-membership.executor-groups.total", num_exec_grps, timeout=30)
 
-  def _get_num_executor_groups(self, only_healthy=False):
+  def _get_num_executor_groups(self, only_healthy=False, exec_group_set_prefix=None):
     """Returns the number of executor groups with at least one executor. If
-    'only_healthy' is True, only the number of healthy executor groups is returned."""
-    if only_healthy:
-      return self.coordinator.service.get_metric_value(
-        "cluster-membership.executor-groups.total-healthy")
+    'only_healthy' is True, only the number of healthy executor groups is returned.
+    If exec_group_set_prefix is used, it returns the metric corresponding to that
+    executor group set."""
+    metric_name = ""
+    if exec_group_set_prefix is None:
+      if only_healthy:
+        metric_name = "cluster-membership.executor-groups.total-healthy"
+      else:
+        metric_name = "cluster-membership.executor-groups.total"
     else:
-      return self.coordinator.service.get_metric_value(
-        "cluster-membership.executor-groups.total")
-
-  def _get_num_queries_executing_for_exec_group(self, group_name_suffix):
-    """Returns the number of queries running on the executor group 'group_name_suffix'.
+      if only_healthy:
+        metric_name = "cluster-membership.group-set.executor-groups.total-healthy.{0}"\
+          .format(exec_group_set_prefix)
+      else:
+        metric_name = "cluster-membership.group-set.executor-groups.total.{0}"\
+          .format(exec_group_set_prefix)
+    return self.coordinator.service.get_metric_value(metric_name)
+
+  def _get_num_queries_executing_for_exec_group(self, group_name_prefix):
+    """Returns the number of queries running on the executor group 'group_name_prefix'.
     None is returned if the group has no executors or does not exist."""
     METRIC_PREFIX = "admission-controller.executor-group.num-queries-executing.{0}"
     return self.coordinator.service.get_metric_value(
-      METRIC_PREFIX.format(self._group_name(DEFAULT_RESOURCE_POOL, group_name_suffix)))
+      METRIC_PREFIX.format(self._group_name(DEFAULT_RESOURCE_POOL, group_name_prefix)))
 
   def _assert_eventually_in_profile(self, query_handle, expected_str):
     """Assert with a timeout of 60 sec and a polling interval of 1 sec that the
@@ -604,10 +615,11 @@ class TestExecutorGroups(CustomClusterTestSuite):
     llama_site_path = os.path.join(RESOURCES_DIR, "llama-site-empty.xml")
     # Start with a regular admission config with multiple pools and no resource limits.
     self._restart_coordinators(num_coordinators=2,
-                               extra_args="-vmodule admission-controller=3 "
-                                          "-fair_scheduler_allocation_path %s "
-                                          "-llama_site_path %s" % (
-                                            fs_allocation_path, llama_site_path))
+         extra_args="-vmodule admission-controller=3 "
+                    "-expected_executor_group_sets=root.queue1:2,root.queue2:1 "
+                    "-fair_scheduler_allocation_path %s "
+                    "-llama_site_path %s" % (
+                      fs_allocation_path, llama_site_path))
 
     # Create fresh clients
     second_coord_client = self.create_client_for_nth_impalad(1)
@@ -619,6 +631,10 @@ class TestExecutorGroups(CustomClusterTestSuite):
     self._add_executor_group("group", 1, admission_control_slots=1,
                              resource_pool="root.queue2", extra_args="-mem_limit=2g")
     assert self._get_num_executor_groups(only_healthy=True) == 2
+    assert self._get_num_executor_groups(only_healthy=True,
+                                         exec_group_set_prefix="root.queue1") == 1
+    assert self._get_num_executor_groups(only_healthy=True,
+                                         exec_group_set_prefix="root.queue2") == 1
 
     # Execute a long running query on group 'queue1'
     self.client.set_configuration({'request_pool': 'queue1'})
@@ -663,3 +679,37 @@ class TestExecutorGroups(CustomClusterTestSuite):
 
     self.client.close()
     second_coord_client.close()
+
+  @pytest.mark.execute_serially
+  def test_per_exec_group_set_metrics(self):
+    """This test verifies that the metrics for each exec group set are updated
+    appropriately."""
+    self._restart_coordinators(num_coordinators=1,
+         extra_args="-expected_executor_group_sets=root.queue1:2,root.queue2:1")
+
+    # Add an unhealthy exec group in queue1 group set
+    self._add_executor_group("group", 2, num_executors=1,
+                             resource_pool="root.queue1", extra_args="-mem_limit=2g")
+    assert self._get_num_executor_groups(only_healthy=True,
+                                         exec_group_set_prefix="root.queue1") == 0
+    assert self._get_num_executor_groups(exec_group_set_prefix="root.queue1") == 1
+    assert self.coordinator.service.get_metric_value(
+      "cluster-membership.group-set.backends.total.root.queue1") == 1
+
+    # Add another executor to the previous group to make healthy again
+    self._add_executor_group("group", 2, num_executors=1,
+                             resource_pool="root.queue1", extra_args="-mem_limit=2g")
+    assert self._get_num_executor_groups(only_healthy=True,
+                                         exec_group_set_prefix="root.queue1") == 1
+    assert self._get_num_executor_groups(exec_group_set_prefix="root.queue1") == 1
+    assert self.coordinator.service.get_metric_value(
+      "cluster-membership.group-set.backends.total.root.queue1") == 2
+
+    # Add a healthy exec group in queue2 group set
+    self._add_executor_group("group", 1,
+                             resource_pool="root.queue2", extra_args="-mem_limit=2g")
+    assert self._get_num_executor_groups(only_healthy=True,
+                                         exec_group_set_prefix="root.queue2") == 1
+    assert self._get_num_executor_groups(exec_group_set_prefix="root.queue2") == 1
+    assert self.coordinator.service.get_metric_value(
+      "cluster-membership.group-set.backends.total.root.queue2") == 1