You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by tm...@apache.org on 2019/08/06 04:18:55 UTC

[impala] 02/04: IMPALA-8806: Add metrics to improve observability of executor groups

This is an automated email from the ASF dual-hosted git repository.

tmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit b5193f36de05218f90581d29dd71d7e570e38b46
Author: Bikramjeet Vig <bi...@cloudera.com>
AuthorDate: Mon Jul 29 16:58:11 2019 -0700

    IMPALA-8806: Add metrics to improve observability of executor groups
    
    This patch adds 3 metrics under a new metric group called
    "cluster-membership" that keep track of the number of executor groups
    that have at least one live executor, number of executor groups that are
    in a healthy state and the number of backends registered with the
    statestore.
    
    Testing:
    Modified tests to use these metrics for verification.
    
    Change-Id: I7745ea1c7c6778d3fb5e59adbc873697beb0f3b9
    Reviewed-on: http://gerrit.cloudera.org:8080/13979
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/exec-env.cc                       |  2 +-
 be/src/scheduling/cluster-membership-mgr-test.cc | 12 +++--
 be/src/scheduling/cluster-membership-mgr.cc      | 38 +++++++++++--
 be/src/scheduling/cluster-membership-mgr.h       | 13 ++++-
 be/src/scheduling/scheduler-test-util.cc         |  8 +--
 be/src/service/impala-server.cc                  |  8 ++-
 common/thrift/metrics.json                       | 29 ++++++++++
 tests/custom_cluster/test_auto_scaling.py        | 68 ++++++++++++++----------
 tests/custom_cluster/test_executor_groups.py     | 47 +++++++++++++---
 9 files changed, 177 insertions(+), 48 deletions(-)

diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 44ca63a..53a231f 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -217,7 +217,7 @@ ExecEnv::ExecEnv(int backend_port, int krpc_port,
   }
 
   cluster_membership_mgr_.reset(new ClusterMembershipMgr(
-      statestore_subscriber_->id(), statestore_subscriber_.get()));
+      statestore_subscriber_->id(), statestore_subscriber_.get(), metrics_.get()));
 
   admission_controller_.reset(
       new AdmissionController(cluster_membership_mgr_.get(), statestore_subscriber_.get(),
diff --git a/be/src/scheduling/cluster-membership-mgr-test.cc b/be/src/scheduling/cluster-membership-mgr-test.cc
index f196cc3..e3e24d1 100644
--- a/be/src/scheduling/cluster-membership-mgr-test.cc
+++ b/be/src/scheduling/cluster-membership-mgr-test.cc
@@ -26,6 +26,7 @@
 #include "service/impala-server.h"
 #include "testutil/gtest-util.h"
 #include "testutil/rand-util.h"
+#include "util/metrics.h"
 
 using std::mt19937;
 using std::uniform_int_distribution;
@@ -69,6 +70,7 @@ class ClusterMembershipMgrTest : public testing::Test {
   /// A struct to hold information related to a simulated backend during the test.
   struct Backend {
     string backend_id;
+    std::unique_ptr<MetricGroup> metric_group;
     std::unique_ptr<ClusterMembershipMgr> cmm;
     std::shared_ptr<TBackendDescriptor> desc;
   };
@@ -175,7 +177,9 @@ class ClusterMembershipMgrTest : public testing::Test {
   /// this method.
   void CreateCMM(Backend* be) {
     ASSERT_TRUE(IsInVector(be, offline_));
-    be->cmm = make_unique<ClusterMembershipMgr>(be->backend_id, nullptr);
+    be->metric_group = make_unique<MetricGroup>("test");
+    be->cmm = make_unique<ClusterMembershipMgr>(
+        be->backend_id, nullptr, be->metric_group.get());
     RemoveFromVector(be, &offline_);
     starting_.push_back(be);
   }
@@ -268,8 +272,10 @@ TEST_F(ClusterMembershipMgrTest, TwoInstances) {
   auto b1 = make_shared<TBackendDescriptor>(MakeBackendDescriptor(1));
   auto b2 = make_shared<TBackendDescriptor>(MakeBackendDescriptor(2));
 
-  ClusterMembershipMgr cmm1(b1->address.hostname, nullptr);
-  ClusterMembershipMgr cmm2(b2->address.hostname, nullptr);
+  MetricGroup tmp_metrics1("test-metrics1");
+  MetricGroup tmp_metrics2("test-metrics2");
+  ClusterMembershipMgr cmm1(b1->address.hostname, nullptr, &tmp_metrics1);
+  ClusterMembershipMgr cmm2(b2->address.hostname, nullptr, &tmp_metrics2);
 
   const Statestore::TopicId topic_id = Statestore::IMPALA_MEMBERSHIP_TOPIC;
   StatestoreSubscriber::TopicDeltaMap topic_delta_map = {{topic_id, TTopicDelta()}};
diff --git a/be/src/scheduling/cluster-membership-mgr.cc b/be/src/scheduling/cluster-membership-mgr.cc
index fe3f98c..e81905e 100644
--- a/be/src/scheduling/cluster-membership-mgr.cc
+++ b/be/src/scheduling/cluster-membership-mgr.cc
@@ -19,6 +19,7 @@
 
 #include "common/logging.h"
 #include "common/names.h"
+#include "util/metrics.h"
 #include "util/test-info.h"
 
 namespace {
@@ -44,12 +45,22 @@ ExecutorGroup* FindOrInsertExecutorGroup(const TExecutorGroupDesc& group,
 
 namespace impala {
 
-ClusterMembershipMgr::ClusterMembershipMgr(string local_backend_id,
-    StatestoreSubscriber* subscriber) :
-    current_membership_(std::make_shared<const Snapshot>()),
+static const string LIVE_EXEC_GROUP_KEY("cluster-membership.executor-groups.total");
+static const string HEALTHY_EXEC_GROUP_KEY(
+    "cluster-membership.executor-groups.total-healthy");
+static const string TOTAL_BACKENDS_KEY("cluster-membership.backends.total");
+
+ClusterMembershipMgr::ClusterMembershipMgr(
+    string local_backend_id, StatestoreSubscriber* subscriber, MetricGroup* metrics)
+  : current_membership_(std::make_shared<const Snapshot>()),
     statestore_subscriber_(subscriber),
     thrift_serializer_(/* compact= */ false),
     local_backend_id_(move(local_backend_id)) {
+  DCHECK(metrics != nullptr);
+  MetricGroup* metric_grp = metrics->GetOrCreateChildGroup("cluster-membership");
+  total_live_executor_groups_ = metric_grp->AddCounter(LIVE_EXEC_GROUP_KEY, 0);
+  total_healthy_executor_groups_ = metric_grp->AddCounter(HEALTHY_EXEC_GROUP_KEY, 0);
+  total_backends_ = metric_grp->AddCounter(TOTAL_BACKENDS_KEY, 0);
 }
 
 Status ClusterMembershipMgr::Init() {
@@ -311,6 +322,8 @@ void ClusterMembershipMgr::UpdateMembership(
     DCHECK(CheckConsistency(*new_backend_map, *new_executor_groups, *new_blacklist));
   }
 
+  UpdateMetrics(*new_backend_map, *new_executor_groups);
+
   // Don't send updates or update the current membership if the statestore is in its
   // post-recovery grace period.
   if (ss_is_recovering) {
@@ -527,4 +540,23 @@ bool ClusterMembershipMgr::CheckConsistency(const BackendIdMap& current_backends
   return true;
 }
 
+void ClusterMembershipMgr::UpdateMetrics(
+    const BackendIdMap& current_backends, const ExecutorGroups& executor_groups) {
+  int total_live_executor_groups = 0;
+  int total_healthy_executor_groups = 0;
+  for (const auto& group_it : executor_groups) {
+    const ExecutorGroup& group = group_it.second;
+    if (group.IsHealthy()) {
+      total_live_executor_groups++;
+      total_healthy_executor_groups++;
+    } else if (group.NumHosts() > 0) {
+      total_live_executor_groups++;
+    }
+  }
+  DCHECK_GE(total_live_executor_groups, total_healthy_executor_groups);
+  total_live_executor_groups_->SetValue(total_live_executor_groups);
+  total_healthy_executor_groups_->SetValue(total_healthy_executor_groups);
+  total_backends_->SetValue(current_backends.size());
+}
+
 } // end namespace impala
diff --git a/be/src/scheduling/cluster-membership-mgr.h b/be/src/scheduling/cluster-membership-mgr.h
index ec11af3..8a09df0 100644
--- a/be/src/scheduling/cluster-membership-mgr.h
+++ b/be/src/scheduling/cluster-membership-mgr.h
@@ -32,6 +32,7 @@
 #include "scheduling/executor-group.h"
 #include "statestore/statestore-subscriber.h"
 #include "util/container-util.h"
+#include "util/metrics-fwd.h"
 
 namespace impala {
 
@@ -127,7 +128,8 @@ class ClusterMembershipMgr {
   /// locks are held when calling this callback.
   typedef std::function<Status(const TUpdateExecutorMembershipRequest&)> UpdateFrontendFn;
 
-  ClusterMembershipMgr(std::string local_backend_id, StatestoreSubscriber* subscriber);
+  ClusterMembershipMgr(std::string local_backend_id, StatestoreSubscriber* subscriber,
+      MetricGroup* metrics);
 
   /// Initializes instances of this class. This only sets up the statestore subscription.
   /// Callbacks to the local ImpalaServer and Frontend must be registered in separate
@@ -205,11 +207,20 @@ class ClusterMembershipMgr {
   bool CheckConsistency(const BackendIdMap& current_backends,
       const ExecutorGroups& executor_groups, const ExecutorBlacklist& executor_blacklist);
 
+  /// Updates the membership metrics.
+  void UpdateMetrics(const BackendIdMap& current_backends,
+      const ExecutorGroups& executor_groups);
+
   /// Ensures that only one thread is processing a membership update at a time, either
   /// from a statestore update or a blacklisting decision. Must be taken before any other
   /// locks in this class.
   boost::mutex update_membership_lock_;
 
+  /// Membership metrics
+  IntCounter* total_live_executor_groups_ = nullptr;
+  IntCounter* total_healthy_executor_groups_ = nullptr;
+  IntCounter* total_backends_ = nullptr;
+
   /// The snapshot of the current cluster membership. When receiving changes to the
   /// executors configuration from the statestore we will make a copy of the stored
   /// object, apply the updates to the copy and atomically swap the contents of this
diff --git a/be/src/scheduling/scheduler-test-util.cc b/be/src/scheduling/scheduler-test-util.cc
index 09a87fb..e0cd7f3 100644
--- a/be/src/scheduling/scheduler-test-util.cc
+++ b/be/src/scheduling/scheduler-test-util.cc
@@ -694,10 +694,10 @@ void SchedulerWrapper::InitializeScheduler() {
                                            << "hosts.";
   const Host& scheduler_host = plan_.cluster().hosts()[0];
   string scheduler_backend_id = scheduler_host.ip;
-  cluster_membership_mgr_.reset(new ClusterMembershipMgr(scheduler_backend_id, nullptr));
-  cluster_membership_mgr_->SetLocalBeDescFn([scheduler_host]() {
-      return BuildBackendDescriptor(scheduler_host);
-  });
+  cluster_membership_mgr_.reset(
+      new ClusterMembershipMgr(scheduler_backend_id, nullptr, &metrics_));
+  cluster_membership_mgr_->SetLocalBeDescFn(
+      [scheduler_host]() { return BuildBackendDescriptor(scheduler_host); });
   Status status = cluster_membership_mgr_->Init();
   DCHECK(status.ok()) << "Cluster membership manager init failed in test";
   scheduler_.reset(new Scheduler(&metrics_, nullptr));
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index d2f44d4..511e3c5 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -242,8 +242,12 @@ DEFINE_bool(is_coordinator, true, "If true, this Impala daemon can accept and co
     "queries from clients. If false, it will refuse client connections.");
 DEFINE_bool(is_executor, true, "If true, this Impala daemon will execute query "
     "fragments.");
-DEFINE_string(executor_groups, "", "List of executor groups, separated by comma. "
-    "Currently only a single group may be specified.");
+DEFINE_string(executor_groups, "",
+    "List of executor groups, separated by comma. Each executor group specification can "
+    "optionally contain a minimum size, separated by a ':', e.g. --executor_groups "
+    "default-pool-1:3. Default minimum size is 1. Only when the cluster membership "
+    "contains at least that number of executors for the group will it be considered "
+    "healthy for admission. Currently only a single group may be specified.");
 
 // TODO: can we automatically choose a startup grace period based on the max admission
 // control queue timeout + some margin for error?
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index 2a0ee30..bd30674 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -2441,5 +2441,34 @@
     "units": "NONE",
     "kind": "GAUGE",
     "key": "events-processor.events-received-15min-rate"
+  },
+  {
+    "description": "Total number of executor groups that have at least one executor",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Total number of executor groups that have at least one executor",
+    "units": "NONE",
+    "kind": "COUNTER",
+    "key": "cluster-membership.executor-groups.total"
+  },
+  {
+    "description": "Total number of executor groups that are in a healthy state, that is, have at least the configured minimum number of executors to be considered for admission",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Total number of executor groups that are in a healthy state",
+    "units": "NONE",
+    "kind": "COUNTER",
+    "key": "cluster-membership.executor-groups.total-healthy"
+  },{
+    "description": "Total number of backends registered with the statestore",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Total number of backends registered with the statestore",
+    "units": "NONE",
+    "kind": "COUNTER",
+    "key": "cluster-membership.backends.total"
   }
 ]
diff --git a/tests/custom_cluster/test_auto_scaling.py b/tests/custom_cluster/test_auto_scaling.py
index 3c5ecca..2bc4600 100644
--- a/tests/custom_cluster/test_auto_scaling.py
+++ b/tests/custom_cluster/test_auto_scaling.py
@@ -42,8 +42,8 @@ class TestAutoScaling(CustomClusterTestSuite):
   def _get_total_admitted_queries(self):
     return self.impalad_test_service.get_total_admitted_queries("default-pool")
 
-  def _get_num_executors(self):
-    return self.impalad_test_service.get_num_known_live_backends(only_executors=True)
+  def _get_num_backends(self):
+    return self.impalad_test_service.get_metric_value("cluster-membership.backends.total")
 
   def _get_num_running_queries(self):
     return self.impalad_test_service.get_num_running_queries("default-pool")
@@ -67,9 +67,12 @@ class TestAutoScaling(CustomClusterTestSuite):
       workload.start()
 
       # Wait for workers to spin up
-      assert any(self._get_num_executors() >= GROUP_SIZE or sleep(1)
+      cluster_size = GROUP_SIZE + 1  # +1 to include coordinator.
+      assert any(self._get_num_backends() >= cluster_size or sleep(1)
                  for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \
           "Number of backends did not increase within %s s" % self.STATE_CHANGE_TIMEOUT_S
+      assert self.impalad_test_service.get_metric_value(
+        "cluster-membership.executor-groups.total-healthy") >= 1
 
       # Wait until we admitted at least 10 queries
       assert any(self._get_total_admitted_queries() >= 10 or sleep(1)
@@ -77,26 +80,30 @@ class TestAutoScaling(CustomClusterTestSuite):
           "Did not admit enough queries within %s s" % self.STATE_CHANGE_TIMEOUT_S
 
       # Wait for second executor group to start
-      num_expected = 2 * GROUP_SIZE
-      assert any(self._get_num_executors() == num_expected or sleep(1)
+      cluster_size = (2 * GROUP_SIZE) + 1
+      assert any(self._get_num_backends() >= cluster_size or sleep(1)
                  for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \
                      "Number of backends did not reach %s within %s s" % (
-                     num_expected, self.STATE_CHANGE_TIMEOUT_S)
+                     cluster_size, self.STATE_CHANGE_TIMEOUT_S)
+      assert self.impalad_test_service.get_metric_value(
+        "cluster-membership.executor-groups.total-healthy") >= 2
 
       # Wait for query rate to surpass the maximum for a single executor group plus 20%
       min_query_rate = 1.2 * EXECUTOR_SLOTS
       assert any(workload.get_query_rate() > min_query_rate or sleep(1)
                  for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \
                      "Query rate did not surpass %s within %s s" % (
-                     num_expected, self.STATE_CHANGE_TIMEOUT_S)
+                     cluster_size, self.STATE_CHANGE_TIMEOUT_S)
 
       LOG.info("Stopping workload")
       workload.stop()
 
       # Wait for workers to spin down
-      assert any(self._get_num_executors() == 0 or sleep(1)
-                 for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \
-          "Backends did not shut down within %s s" % self.STATE_CHANGE_TIMEOUT_S
+      self.impalad_test_service.wait_for_metric_value(
+        "cluster-membership.backends.total", 1,
+        timeout=self.STATE_CHANGE_TIMEOUT_S, interval=1)
+      assert self.impalad_test_service.get_metric_value(
+        "cluster-membership.executor-groups.total") == 0
 
     finally:
       if workload:
@@ -122,9 +129,10 @@ class TestAutoScaling(CustomClusterTestSuite):
       workload.start()
 
       # Wait for workers to spin up
-      assert any(self._get_num_executors() >= GROUP_SIZE or sleep(1)
-                 for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \
-          "Number of backends did not increase within %s s" % self.STATE_CHANGE_TIMEOUT_S
+      cluster_size = GROUP_SIZE + 1  # +1 to include coordinator.
+      self.impalad_test_service.wait_for_metric_value(
+        "cluster-membership.backends.total", cluster_size,
+        timeout=self.STATE_CHANGE_TIMEOUT_S, interval=1)
 
       # Wait until we admitted at least 10 queries
       assert any(self._get_total_admitted_queries() >= 10 or sleep(1)
@@ -144,15 +152,18 @@ class TestAutoScaling(CustomClusterTestSuite):
           "Unexpected number of running queries: %s" % num_running
 
       # Check that only a single group started
-      assert self._get_num_executors() == GROUP_SIZE
+      assert self.impalad_test_service.get_metric_value(
+        "cluster-membership.executor-groups.total-healthy") == 1
 
       LOG.info("Stopping workload")
       workload.stop()
 
       # Wait for workers to spin down
-      assert any(self._get_num_executors() == 0 or sleep(1)
-                 for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \
-          "Backends did not shut down within %s s" % self.STATE_CHANGE_TIMEOUT_S
+      self.impalad_test_service.wait_for_metric_value(
+        "cluster-membership.backends.total", 1,
+        timeout=self.STATE_CHANGE_TIMEOUT_S, interval=1)
+      assert self.impalad_test_service.get_metric_value(
+        "cluster-membership.executor-groups.total") == 0
 
     finally:
       if workload:
@@ -179,22 +190,23 @@ class TestAutoScaling(CustomClusterTestSuite):
       workload.start()
 
       # Wait for first executor to start up
-      assert any(self._get_num_executors() >= 1 or sleep(1)
-                 for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \
-          "Number of backends did not increase within %s s" % self.STATE_CHANGE_TIMEOUT_S
+      self.impalad_test_service.wait_for_metric_value(
+        "cluster-membership.executor-groups.total", 1,
+        timeout=self.STATE_CHANGE_TIMEOUT_S, interval=1)
 
       # Wait for remaining executors to start up and make sure that no queries are
       # admitted during startup
       end_time = time() + self.STATE_CHANGE_TIMEOUT_S
       startup_complete = False
+      cluster_size = GROUP_SIZE + 1  # +1 to include coordinator.
       while time() < end_time:
         num_admitted = self._get_total_admitted_queries()
-        num_backends = self._get_num_executors()
-        if num_backends < GROUP_SIZE:
+        num_backends = self._get_num_backends()
+        if num_backends < cluster_size:
           assert num_admitted == 0, "%s/%s backends started but %s queries have " \
-              "already been admitted." % (num_backends, GROUP_SIZE, num_admitted)
+              "already been admitted." % (num_backends, cluster_size, num_admitted)
         if num_admitted > 0:
-          assert num_backends == GROUP_SIZE
+          assert num_backends == cluster_size
           startup_complete = True
           break
         sleep(1)
@@ -205,9 +217,11 @@ class TestAutoScaling(CustomClusterTestSuite):
       workload.stop()
 
       # Wait for workers to spin down
-      assert any(self._get_num_executors() == 0 or sleep(1)
-                 for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \
-          "Backends did not shut down within %s s" % self.STATE_CHANGE_TIMEOUT_S
+      self.impalad_test_service.wait_for_metric_value(
+        "cluster-membership.backends.total", 1,
+        timeout=self.STATE_CHANGE_TIMEOUT_S, interval=1)
+      assert self.impalad_test_service.get_metric_value(
+        "cluster-membership.executor-groups.total") == 0
 
     finally:
       if workload:
diff --git a/tests/custom_cluster/test_executor_groups.py b/tests/custom_cluster/test_executor_groups.py
index 5a5a4b1..9c0315b 100644
--- a/tests/custom_cluster/test_executor_groups.py
+++ b/tests/custom_cluster/test_executor_groups.py
@@ -42,6 +42,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
     self.num_groups = 1
     self.num_executors = 1
     super(TestExecutorGroups, self).setup_method(method)
+    self.coordinator = self.cluster.impalads[0]
 
   def _group_name(self, name):
     # By convention, group names must start with their associated resource pool name
@@ -88,6 +89,8 @@ class TestExecutorGroups(CustomClusterTestSuite):
     """Tests that a query submitted to a coordinator with no executor group times out."""
     result = self.execute_query_expect_failure(self.client, "select sleep(2)")
     assert "Admission for query exceeded timeout" in str(result)
+    assert self.coordinator.service.get_metric_value(
+      "cluster-membership.executor-groups.total-healthy") == 0
 
   @pytest.mark.execute_serially
   def test_single_group(self):
@@ -95,6 +98,8 @@ class TestExecutorGroups(CustomClusterTestSuite):
     QUERY = "select count(*) from functional.alltypestiny"
     self._add_executor_group("group1", 2)
     self.execute_query_expect_success(self.client, QUERY)
+    assert self.coordinator.service.get_metric_value(
+      "cluster-membership.executor-groups.total-healthy") == 1
 
   @pytest.mark.execute_serially
   def test_executor_group_starts_while_qeueud(self):
@@ -105,8 +110,12 @@ class TestExecutorGroups(CustomClusterTestSuite):
     handle = client.execute_async(QUERY)
     profile = client.get_runtime_profile(handle)
     assert "No healthy executor groups found for pool" in profile
+    assert self.coordinator.service.get_metric_value(
+      "cluster-membership.executor-groups.total-healthy") == 0
     self._add_executor_group("group1", 2)
     client.wait_for_finished_timeout(handle, 20)
+    assert self.coordinator.service.get_metric_value(
+      "cluster-membership.executor-groups.total-healthy") == 1
 
   @pytest.mark.execute_serially
   def test_executor_group_health(self):
@@ -114,14 +123,18 @@ class TestExecutorGroups(CustomClusterTestSuite):
     QUERY = "select count(*) from functional.alltypestiny"
     # Start cluster and group
     self._add_executor_group("group1", 2)
+    self.coordinator.service.wait_for_metric_value(
+      "cluster-membership.executor-groups.total-healthy", 1)
     client = self.client
     # Run query to validate
     self.execute_query_expect_success(client, QUERY)
     # Kill an executor
-    coordinator = self.cluster.impalads[0]
     executor = self.cluster.impalads[1]
     executor.kill()
-    coordinator.service.wait_for_num_known_live_backends(2)
+    self.coordinator.service.wait_for_metric_value("cluster-membership.backends.total", 2,
+                                                   timeout=20)
+    assert self.coordinator.service.get_metric_value(
+      "cluster-membership.executor-groups.total-healthy") == 0
     # Run query and observe timeout
     handle = client.execute_async(QUERY)
     profile = client.get_runtime_profile(handle)
@@ -132,6 +145,8 @@ class TestExecutorGroups(CustomClusterTestSuite):
     client.wait_for_finished_timeout(handle, 20)
     # Run query and observe success
     self.execute_query_expect_success(client, QUERY)
+    assert self.coordinator.service.wait_for_metric_value(
+      "cluster-membership.executor-groups.total-healthy", 1)
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(impalad_args="-default_pool_max_requests=1")
@@ -146,19 +161,22 @@ class TestExecutorGroups(CustomClusterTestSuite):
     profile = client.get_runtime_profile(q2)
     assert "Initial admission queue reason: number of running queries" in profile, profile
     # Kill an executor
-    coordinator = self.cluster.impalads[0]
     executor = self.cluster.impalads[1]
     executor.kill()
-    coordinator.service.wait_for_num_known_live_backends(2)
+    self.coordinator.service.wait_for_metric_value("cluster-membership.backends.total", 2)
     # Wait for q1 to finish (sleep runs on the coordinator)
     client.wait_for_finished_timeout(q1, 20)
     # Check that q2 still hasn't run
     profile = client.get_runtime_profile(q2)
     assert "Admission result: Queued" in profile, profile
+    assert self.coordinator.service.get_metric_value(
+      "cluster-membership.executor-groups.total-healthy") == 0
     # Restore executor group health
     executor.start()
     # Query should now finish
     client.wait_for_finished_timeout(q2, 20)
+    assert self.coordinator.service.get_metric_value(
+      "cluster-membership.executor-groups.total-healthy") == 1
 
   @pytest.mark.execute_serially
   def test_max_concurrent_queries(self):
@@ -184,6 +202,8 @@ class TestExecutorGroups(CustomClusterTestSuite):
              where month < 3 and id + random() < sleep(500);"
     self._add_executor_group("group1", 2, max_concurrent_queries=1)
     self._add_executor_group("group2", 2, max_concurrent_queries=1)
+    self.coordinator.service.wait_for_metric_value(
+      "cluster-membership.executor-groups.total-healthy", 2)
     client = self.client
     q1 = client.execute_async(QUERY)
     client.wait_for_admission_control(q1)
@@ -257,6 +277,11 @@ class TestExecutorGroups(CustomClusterTestSuite):
     QUERY = "select sleep(4)"
     # Start first executor
     self._add_executor_group("group1", 3, num_executors=1)
+    self.coordinator.service.wait_for_metric_value("cluster-membership.backends.total", 2)
+    assert self.coordinator.service.get_metric_value(
+      "cluster-membership.executor-groups.total") == 1
+    assert self.coordinator.service.get_metric_value(
+      "cluster-membership.executor-groups.total-healthy") == 0
     # Run query and observe that it gets queued
     client = self.client
     handle = client.execute_async(QUERY)
@@ -266,10 +291,16 @@ class TestExecutorGroups(CustomClusterTestSuite):
     initial_state = client.get_state(handle)
     # Start another executor and observe that the query stays queued
     self._add_executor_group("group1", 3, num_executors=1)
+    self.coordinator.service.wait_for_metric_value("cluster-membership.backends.total", 3)
+    assert self.coordinator.service.get_metric_value(
+      "cluster-membership.executor-groups.total-healthy") == 0
     profile = client.get_runtime_profile(handle)
     assert client.get_state(handle) == initial_state
     # Start the remaining executor and observe that the query finishes
     self._add_executor_group("group1", 3, num_executors=1)
+    self.coordinator.service.wait_for_metric_value("cluster-membership.backends.total", 4)
+    assert self.coordinator.service.get_metric_value(
+      "cluster-membership.executor-groups.total-healthy") == 1
     client.wait_for_finished_timeout(handle, 20)
 
   @pytest.mark.execute_serially
@@ -283,16 +314,18 @@ class TestExecutorGroups(CustomClusterTestSuite):
     # Run query to make sure things work
     QUERY = "select count(*) from functional.alltypestiny"
     self.execute_query_expect_success(self.client, QUERY)
+    assert self.coordinator.service.get_metric_value(
+      "cluster-membership.executor-groups.total-healthy") == 1
     # Kill executors to make group empty
     impalads = self.cluster.impalads
     impalads[1].kill()
     impalads[2].kill()
-    impalads[0].service.wait_for_num_known_live_backends(1)
+    self.coordinator.service.wait_for_metric_value("cluster-membership.backends.total", 1)
     # Run query to make sure it times out
     result = self.execute_query_expect_failure(self.client, QUERY)
-    print str(result)
     expected_error = "Query aborted:Admission for query exceeded timeout 2000ms in " \
                      "pool default-pool. Queued reason: No healthy executor groups " \
                      "found for pool default-pool."
     assert expected_error in str(result)
-
+    assert self.coordinator.service.get_metric_value(
+      "cluster-membership.executor-groups.total-healthy") == 0