You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2022/01/21 05:50:31 UTC

[impala] branch master updated (3f51a6a -> 0aedf60)

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from 3f51a6a  IMPALA-11051: Add support for 'void' Iceberg partition transform
     new 402c12b  IMPALA-11075: Use latest API for get_partitions_ps_with_auth()
     new 0aedf60  IMPALA-11063: Add metrics to expose state of each executor group set

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/scheduling/cluster-membership-mgr.cc        | 67 +++++++++++++++---
 be/src/scheduling/cluster-membership-mgr.h         | 23 ++++--
 common/thrift/metrics.json                         | 31 +++++++-
 .../catalog/metastore/MetastoreServiceHandler.java |  9 +--
 tests/custom_cluster/test_executor_groups.py       | 82 +++++++++++++++++-----
 5 files changed, 174 insertions(+), 38 deletions(-)

[impala] 02/02: IMPALA-11063: Add metrics to expose state of each executor group set

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 0aedf6021fa0e6ab4f93313c9dc7eae3b4860a46
Author: Bikramjeet Vig <bi...@gmail.com>
AuthorDate: Tue Jan 11 20:16:19 2022 -0800

    IMPALA-11063: Add metrics to expose state of each executor group set
    
    This adds metrics for each executor group set that expose the number
    of executor groups, the number of healthy executor groups and the
    total number of backends associated with that group set.
    
    Testing:
    Added an e2e test to verify metrics are updated correctly.
    
    Change-Id: Ib39f940de830ef6302785aee30eeb847fa5deeba
    Reviewed-on: http://gerrit.cloudera.org:8080/18142
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/scheduling/cluster-membership-mgr.cc  | 67 ++++++++++++++++++++---
 be/src/scheduling/cluster-membership-mgr.h   | 23 ++++++--
 common/thrift/metrics.json                   | 31 ++++++++++-
 tests/custom_cluster/test_executor_groups.py | 82 ++++++++++++++++++++++------
 4 files changed, 173 insertions(+), 30 deletions(-)

diff --git a/be/src/scheduling/cluster-membership-mgr.cc b/be/src/scheduling/cluster-membership-mgr.cc
index afb9e41..ff66317 100644
--- a/be/src/scheduling/cluster-membership-mgr.cc
+++ b/be/src/scheduling/cluster-membership-mgr.cc
@@ -60,6 +60,13 @@ static const string LIVE_EXEC_GROUP_KEY("cluster-membership.executor-groups.tota
 static const string HEALTHY_EXEC_GROUP_KEY(
     "cluster-membership.executor-groups.total-healthy");
 static const string TOTAL_BACKENDS_KEY("cluster-membership.backends.total");
+// Per group set metrics
+static const string LIVE_EXEC_GROUP_KEY_FORMAT(
+    "cluster-membership.group-set.executor-groups.total.$0");
+static const string HEALTHY_EXEC_GROUP_KEY_FORMAT(
+    "cluster-membership.group-set.executor-groups.total-healthy.$0");
+static const string TOTAL_BACKENDS_KEY_FORMAT(
+    "cluster-membership.group-set.backends.total.$0");
 
 ClusterMembershipMgr::ClusterMembershipMgr(
     string local_backend_id, StatestoreSubscriber* subscriber, MetricGroup* metrics)
@@ -71,16 +78,34 @@ ClusterMembershipMgr::ClusterMembershipMgr(
   if(!status.ok()) {
     LOG(FATAL) << "Error populating expected executor group sets: " << status;
   }
-  DCHECK(metrics != nullptr);
-  MetricGroup* metric_grp = metrics->GetOrCreateChildGroup("cluster-membership");
-  total_live_executor_groups_ = metric_grp->AddCounter(LIVE_EXEC_GROUP_KEY, 0);
-  total_healthy_executor_groups_ = metric_grp->AddCounter(HEALTHY_EXEC_GROUP_KEY, 0);
-  total_backends_ = metric_grp->AddCounter(TOTAL_BACKENDS_KEY, 0);
+  InitMetrics(metrics);
   // Register the metric update function as a callback.
   RegisterUpdateCallbackFn([this](
       ClusterMembershipMgr::SnapshotPtr snapshot) { this->UpdateMetrics(snapshot); });
 }
 
+void ClusterMembershipMgr::InitMetrics(MetricGroup* metrics) {
+  DCHECK(metrics != nullptr);
+  MetricGroup* metric_grp = metrics->GetOrCreateChildGroup("cluster-membership");
+  aggregated_group_set_metrics_.total_live_executor_groups_ =
+      metric_grp->AddCounter(LIVE_EXEC_GROUP_KEY, 0);
+  aggregated_group_set_metrics_.total_healthy_executor_groups_ =
+      metric_grp->AddCounter(HEALTHY_EXEC_GROUP_KEY, 0);
+  aggregated_group_set_metrics_.total_backends_ =
+      metric_grp->AddCounter(TOTAL_BACKENDS_KEY, 0);
+
+  for (auto& set : expected_exec_group_sets_) {
+    GroupSetMetrics grp_set_metrics;
+    grp_set_metrics.total_live_executor_groups_ =
+        metric_grp->AddCounter(LIVE_EXEC_GROUP_KEY_FORMAT, 0, set.exec_group_name_prefix);
+    grp_set_metrics.total_healthy_executor_groups_ = metric_grp->AddCounter(
+        HEALTHY_EXEC_GROUP_KEY_FORMAT, 0, set.exec_group_name_prefix);
+    grp_set_metrics.total_backends_ =
+        metric_grp->AddCounter(TOTAL_BACKENDS_KEY_FORMAT, 0, set.exec_group_name_prefix);
+    per_group_set_metrics_.insert({set.exec_group_name_prefix, grp_set_metrics});
+  }
+}
+
 Status ClusterMembershipMgr::Init() {
   LOG(INFO) << "Starting cluster membership manager";
   if (statestore_subscriber_ == nullptr) {
@@ -565,9 +590,35 @@ void ClusterMembershipMgr::UpdateMetrics(const SnapshotPtr& new_state){
     }
   }
   DCHECK_GE(total_live_executor_groups, healthy_executor_groups);
-  total_live_executor_groups_->SetValue(total_live_executor_groups);
-  total_healthy_executor_groups_->SetValue(healthy_executor_groups);
-  total_backends_->SetValue(new_state->current_backends.size());
+  aggregated_group_set_metrics_.total_live_executor_groups_->SetValue(
+      total_live_executor_groups);
+  aggregated_group_set_metrics_.total_healthy_executor_groups_->SetValue(
+      healthy_executor_groups);
+  aggregated_group_set_metrics_.total_backends_->SetValue(
+      new_state->current_backends.size());
+
+  for (auto& set : expected_exec_group_sets_) {
+    int total_backends = 0;
+    int64_t total_live_exec_groups = 0;
+    int64_t healthy_exec_groups = 0;
+    StringPiece prefix(set.exec_group_name_prefix);
+    for (const auto& group_it : new_state->executor_groups) {
+      StringPiece name(group_it.first);
+      if (!name.starts_with(prefix)) continue;
+      const ExecutorGroup& group = group_it.second;
+      if (group.IsHealthy()) {
+        ++healthy_exec_groups;
+      }
+      if (group.NumHosts() > 0) {
+        ++total_live_exec_groups;
+        total_backends += group.NumExecutors();
+      }
+    }
+    auto& grp_metrics = per_group_set_metrics_[set.exec_group_name_prefix];
+    grp_metrics.total_live_executor_groups_->SetValue(total_live_exec_groups);
+    grp_metrics.total_healthy_executor_groups_->SetValue(healthy_exec_groups);
+    grp_metrics.total_backends_->SetValue(total_backends);
+  }
 }
 
 bool ClusterMembershipMgr::IsBackendInExecutorGroups(
diff --git a/be/src/scheduling/cluster-membership-mgr.h b/be/src/scheduling/cluster-membership-mgr.h
index 12bfae4..7ce425c 100644
--- a/be/src/scheduling/cluster-membership-mgr.h
+++ b/be/src/scheduling/cluster-membership-mgr.h
@@ -176,6 +176,12 @@ class ClusterMembershipMgr {
   }
 
  private:
+  struct GroupSetMetrics {
+    IntCounter* total_live_executor_groups_ = nullptr;
+    IntCounter* total_healthy_executor_groups_ = nullptr;
+    IntCounter* total_backends_ = nullptr;
+  };
+
   /// Serializes and adds the local backend descriptor to 'subscriber_topic_updates'.
   void AddLocalBackendToStatestore(const BackendDescriptorPB& local_be_desc,
       std::vector<TTopicDelta>* subscriber_topic_updates);
@@ -206,7 +212,7 @@ class ClusterMembershipMgr {
   /// Updates the membership metrics. Is registered as an updated callback function to
   /// receive any membership updates. The only exception is that this is called directly
   /// in BlacklistExecutor() where updates are not required to be sent to external
-  /// listeners.
+  /// listeners. InitMetrics() must have been called before registering this method.
   void UpdateMetrics(const SnapshotPtr& new_state);
 
   /// Returns true if the 'be_desc' is in any of the 'executor_groups', false otherwise.
@@ -221,6 +227,10 @@ class ClusterMembershipMgr {
   static Status PopulateExpectedExecGroupSets(
       std::vector<TExecutorGroupSet>& expected_exec_group_sets);
 
+  /// Helper method to initialize metrics. Also initializes metrics for all group sets if
+  /// expected group sets are specified.
+  void InitMetrics(MetricGroup* metrics);
+
   /// An empty group used for scheduling coordinator only queries.
   const ExecutorGroup empty_exec_group_;
 
@@ -233,10 +243,13 @@ class ClusterMembershipMgr {
   /// locks in this class.
   std::mutex update_membership_lock_;
 
-  /// Membership metrics
-  IntCounter* total_live_executor_groups_ = nullptr;
-  IntCounter* total_healthy_executor_groups_ = nullptr;
-  IntCounter* total_backends_ = nullptr;
+  /// Membership metrics for all executor groups regardless of which group set they belong
+  /// to. The value for total backends also includes the coordinators.
+  GroupSetMetrics aggregated_group_set_metrics_;
+
+  /// Maps from the group set prefix to its group set metrics. Is empty if group sets are
+  /// not used.
+  std::unordered_map<string, GroupSetMetrics> per_group_set_metrics_;
 
   /// The snapshot of the current cluster membership. When receiving changes to the
   /// executors configuration from the statestore we will make a copy of the stored
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index 520bbe9..5780c83 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -2762,7 +2762,7 @@
     "kind": "COUNTER",
     "key": "cluster-membership.executor-groups.total-healthy"
   },{
-    "description": "Total number of backends registered with the statestore",
+    "description": "Total number of backends (both coordinators and executors) registered with the statestore",
     "contexts": [
       "IMPALAD"
     ],
@@ -2772,6 +2772,35 @@
     "key": "cluster-membership.backends.total"
   },
   {
+    "description": "Total number of executor groups in the executor group set $0 that have at least one executor",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Total number of executor groups in the executor group set $0 that have at least one executor",
+    "units": "NONE",
+    "kind": "COUNTER",
+    "key": "cluster-membership.group-set.executor-groups.total.$0"
+  },
+  {
+    "description": "Total number of executor groups in the executor group set $0 that are in a healthy state, that is, have at least the configured minimum number of executors to be considered for admission",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Total number of executor groups in the executor group set $0 that are in a healthy state",
+    "units": "NONE",
+    "kind": "COUNTER",
+    "key": "cluster-membership.group-set.executor-groups.total-healthy.$0"
+  },{
+    "description": "Total number of backends in the executor group set $0 registered with the statestore",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Total number of executors in the executor group set $0 registered with the statestore",
+    "units": "NONE",
+    "kind": "COUNTER",
+    "key": "cluster-membership.group-set.backends.total.$0"
+  },
+  {
   "description": "Total number of queries admitted on this coordinator running on executor group: $0",
   "contexts": [
     "IMPALAD"
diff --git a/tests/custom_cluster/test_executor_groups.py b/tests/custom_cluster/test_executor_groups.py
index 6baed97..a35ca8e 100644
--- a/tests/custom_cluster/test_executor_groups.py
+++ b/tests/custom_cluster/test_executor_groups.py
@@ -33,6 +33,7 @@ TEST_QUERY = "select count(*) from functional.alltypes where month + random() <
 
 DEFAULT_RESOURCE_POOL = "default-pool"
 
+
 class TestExecutorGroups(CustomClusterTestSuite):
   """This class contains tests that exercise the logic related to scaling clusters up and
   down by adding and removing groups of executors. All tests start with a base cluster
@@ -97,7 +98,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
                                expected_num_impalads=num_coordinators,
                                use_exclusive_coordinators=True)
     self.coordinator = self.cluster.impalads[0]
-    self.num_impalads = 2
+    self.num_impalads = num_coordinators
 
   def _get_total_admitted_queries(self):
     """Returns the total number of queries that have been admitted to the default resource
@@ -120,22 +121,32 @@ class TestExecutorGroups(CustomClusterTestSuite):
       return self.coordinator.service.wait_for_metric_value(
         "cluster-membership.executor-groups.total", num_exec_grps, timeout=30)
 
-  def _get_num_executor_groups(self, only_healthy=False):
+  def _get_num_executor_groups(self, only_healthy=False, exec_group_set_prefix=None):
     """Returns the number of executor groups with at least one executor. If
-    'only_healthy' is True, only the number of healthy executor groups is returned."""
-    if only_healthy:
-      return self.coordinator.service.get_metric_value(
-        "cluster-membership.executor-groups.total-healthy")
+    'only_healthy' is True, only the number of healthy executor groups is returned.
+    If exec_group_set_prefix is used, it returns the metric corresponding to that
+    executor group set."""
+    metric_name = ""
+    if exec_group_set_prefix is None:
+      if only_healthy:
+        metric_name = "cluster-membership.executor-groups.total-healthy"
+      else:
+        metric_name = "cluster-membership.executor-groups.total"
     else:
-      return self.coordinator.service.get_metric_value(
-        "cluster-membership.executor-groups.total")
-
-  def _get_num_queries_executing_for_exec_group(self, group_name_suffix):
-    """Returns the number of queries running on the executor group 'group_name_suffix'.
+      if only_healthy:
+        metric_name = "cluster-membership.group-set.executor-groups.total-healthy.{0}"\
+          .format(exec_group_set_prefix)
+      else:
+        metric_name = "cluster-membership.group-set.executor-groups.total.{0}"\
+          .format(exec_group_set_prefix)
+    return self.coordinator.service.get_metric_value(metric_name)
+
+  def _get_num_queries_executing_for_exec_group(self, group_name_prefix):
+    """Returns the number of queries running on the executor group 'group_name_prefix'.
     None is returned if the group has no executors or does not exist."""
     METRIC_PREFIX = "admission-controller.executor-group.num-queries-executing.{0}"
     return self.coordinator.service.get_metric_value(
-      METRIC_PREFIX.format(self._group_name(DEFAULT_RESOURCE_POOL, group_name_suffix)))
+      METRIC_PREFIX.format(self._group_name(DEFAULT_RESOURCE_POOL, group_name_prefix)))
 
   def _assert_eventually_in_profile(self, query_handle, expected_str):
     """Assert with a timeout of 60 sec and a polling interval of 1 sec that the
@@ -604,10 +615,11 @@ class TestExecutorGroups(CustomClusterTestSuite):
     llama_site_path = os.path.join(RESOURCES_DIR, "llama-site-empty.xml")
     # Start with a regular admission config with multiple pools and no resource limits.
     self._restart_coordinators(num_coordinators=2,
-                               extra_args="-vmodule admission-controller=3 "
-                                          "-fair_scheduler_allocation_path %s "
-                                          "-llama_site_path %s" % (
-                                            fs_allocation_path, llama_site_path))
+         extra_args="-vmodule admission-controller=3 "
+                    "-expected_executor_group_sets=root.queue1:2,root.queue2:1 "
+                    "-fair_scheduler_allocation_path %s "
+                    "-llama_site_path %s" % (
+                      fs_allocation_path, llama_site_path))
 
     # Create fresh clients
     second_coord_client = self.create_client_for_nth_impalad(1)
@@ -619,6 +631,10 @@ class TestExecutorGroups(CustomClusterTestSuite):
     self._add_executor_group("group", 1, admission_control_slots=1,
                              resource_pool="root.queue2", extra_args="-mem_limit=2g")
     assert self._get_num_executor_groups(only_healthy=True) == 2
+    assert self._get_num_executor_groups(only_healthy=True,
+                                         exec_group_set_prefix="root.queue1") == 1
+    assert self._get_num_executor_groups(only_healthy=True,
+                                         exec_group_set_prefix="root.queue2") == 1
 
     # Execute a long running query on group 'queue1'
     self.client.set_configuration({'request_pool': 'queue1'})
@@ -663,3 +679,37 @@ class TestExecutorGroups(CustomClusterTestSuite):
 
     self.client.close()
     second_coord_client.close()
+
+  @pytest.mark.execute_serially
+  def test_per_exec_group_set_metrics(self):
+    """This test verifies that the metrics for each exec group set are updated
+    appropriately."""
+    self._restart_coordinators(num_coordinators=1,
+         extra_args="-expected_executor_group_sets=root.queue1:2,root.queue2:1")
+
+    # Add an unhealthy exec group in queue1 group set
+    self._add_executor_group("group", 2, num_executors=1,
+                             resource_pool="root.queue1", extra_args="-mem_limit=2g")
+    assert self._get_num_executor_groups(only_healthy=True,
+                                         exec_group_set_prefix="root.queue1") == 0
+    assert self._get_num_executor_groups(exec_group_set_prefix="root.queue1") == 1
+    assert self.coordinator.service.get_metric_value(
+      "cluster-membership.group-set.backends.total.root.queue1") == 1
+
+    # Add another executor to the previous group to make healthy again
+    self._add_executor_group("group", 2, num_executors=1,
+                             resource_pool="root.queue1", extra_args="-mem_limit=2g")
+    assert self._get_num_executor_groups(only_healthy=True,
+                                         exec_group_set_prefix="root.queue1") == 1
+    assert self._get_num_executor_groups(exec_group_set_prefix="root.queue1") == 1
+    assert self.coordinator.service.get_metric_value(
+      "cluster-membership.group-set.backends.total.root.queue1") == 2
+
+    # Add a healthy exec group in queue2 group set
+    self._add_executor_group("group", 1,
+                             resource_pool="root.queue2", extra_args="-mem_limit=2g")
+    assert self._get_num_executor_groups(only_healthy=True,
+                                         exec_group_set_prefix="root.queue2") == 1
+    assert self._get_num_executor_groups(exec_group_set_prefix="root.queue2") == 1
+    assert self.coordinator.service.get_metric_value(
+      "cluster-membership.group-set.backends.total.root.queue2") == 1

[impala] 01/02: IMPALA-11075: Use latest API for get_partitions_ps_with_auth()

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 402c12b5d3394df4381bfe1604c28272dd247581
Author: Steve Carlin <sc...@cloudera.com>
AuthorDate: Tue Jan 18 19:09:45 2022 -0800

    IMPALA-11075: Use latest API for get_partitions_ps_with_auth()
    
    Change the code to use the latest API for get_partitions_ps_with_auth().
    
    Change-Id: I3e93d97f97b3a4f6dd36a40476f55f430f74d6ce
    Reviewed-on: http://gerrit.cloudera.org:8080/18157
    Reviewed-by: Vihang Karajgaonkar <vi...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../apache/impala/catalog/metastore/MetastoreServiceHandler.java | 9 +--------
 1 file changed, 1 insertion(+), 8 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/metastore/MetastoreServiceHandler.java b/fe/src/main/java/org/apache/impala/catalog/metastore/MetastoreServiceHandler.java
index 48e6c3c..0b624c7 100644
--- a/fe/src/main/java/org/apache/impala/catalog/metastore/MetastoreServiceHandler.java
+++ b/fe/src/main/java/org/apache/impala/catalog/metastore/MetastoreServiceHandler.java
@@ -1415,14 +1415,7 @@ public abstract class MetastoreServiceHandler extends AbstractThriftHiveMetastor
      GetPartitionsPsWithAuthRequest req)
      throws MetaException, NoSuchObjectException, TException {
    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
-     List<Partition> partitions = client.getHiveClient().getThriftClient()
-         .get_partitions_ps_with_auth(MetaStoreUtils
-                 .prependCatalogToDbName(req.getCatName(), req.getDbName(), serverConf_),
-             req.getTblName(), req.getPartVals(), req.getMaxParts(),
-             req.getUserName(), req.getGroupNames());
-     GetPartitionsPsWithAuthResponse res = new GetPartitionsPsWithAuthResponse();
-     res.setPartitions(partitions);
-     return res;
+     return client.getHiveClient().getThriftClient().get_partitions_ps_with_auth_req(req);
    }
  }