You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by tm...@apache.org on 2019/08/06 04:18:55 UTC
[impala] 02/04: IMPALA-8806: Add metrics to improve observability
of executor groups
This is an automated email from the ASF dual-hosted git repository.
tmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit b5193f36de05218f90581d29dd71d7e570e38b46
Author: Bikramjeet Vig <bi...@cloudera.com>
AuthorDate: Mon Jul 29 16:58:11 2019 -0700
IMPALA-8806: Add metrics to improve observability of executor groups
This patch adds 3 metrics under a new metric group called
"cluster-membership" that keep track of the number of executor groups
that have at least one live executor, number of executor groups that are
in a healthy state and the number of backends registered with the
statestore.
Testing:
Modified tests to use these metrics for verification.
Change-Id: I7745ea1c7c6778d3fb5e59adbc873697beb0f3b9
Reviewed-on: http://gerrit.cloudera.org:8080/13979
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/runtime/exec-env.cc | 2 +-
be/src/scheduling/cluster-membership-mgr-test.cc | 12 +++--
be/src/scheduling/cluster-membership-mgr.cc | 38 +++++++++++--
be/src/scheduling/cluster-membership-mgr.h | 13 ++++-
be/src/scheduling/scheduler-test-util.cc | 8 +--
be/src/service/impala-server.cc | 8 ++-
common/thrift/metrics.json | 29 ++++++++++
tests/custom_cluster/test_auto_scaling.py | 68 ++++++++++++++----------
tests/custom_cluster/test_executor_groups.py | 47 +++++++++++++---
9 files changed, 177 insertions(+), 48 deletions(-)
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 44ca63a..53a231f 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -217,7 +217,7 @@ ExecEnv::ExecEnv(int backend_port, int krpc_port,
}
cluster_membership_mgr_.reset(new ClusterMembershipMgr(
- statestore_subscriber_->id(), statestore_subscriber_.get()));
+ statestore_subscriber_->id(), statestore_subscriber_.get(), metrics_.get()));
admission_controller_.reset(
new AdmissionController(cluster_membership_mgr_.get(), statestore_subscriber_.get(),
diff --git a/be/src/scheduling/cluster-membership-mgr-test.cc b/be/src/scheduling/cluster-membership-mgr-test.cc
index f196cc3..e3e24d1 100644
--- a/be/src/scheduling/cluster-membership-mgr-test.cc
+++ b/be/src/scheduling/cluster-membership-mgr-test.cc
@@ -26,6 +26,7 @@
#include "service/impala-server.h"
#include "testutil/gtest-util.h"
#include "testutil/rand-util.h"
+#include "util/metrics.h"
using std::mt19937;
using std::uniform_int_distribution;
@@ -69,6 +70,7 @@ class ClusterMembershipMgrTest : public testing::Test {
/// A struct to hold information related to a simulated backend during the test.
struct Backend {
string backend_id;
+ std::unique_ptr<MetricGroup> metric_group;
std::unique_ptr<ClusterMembershipMgr> cmm;
std::shared_ptr<TBackendDescriptor> desc;
};
@@ -175,7 +177,9 @@ class ClusterMembershipMgrTest : public testing::Test {
/// this method.
void CreateCMM(Backend* be) {
ASSERT_TRUE(IsInVector(be, offline_));
- be->cmm = make_unique<ClusterMembershipMgr>(be->backend_id, nullptr);
+ be->metric_group = make_unique<MetricGroup>("test");
+ be->cmm = make_unique<ClusterMembershipMgr>(
+ be->backend_id, nullptr, be->metric_group.get());
RemoveFromVector(be, &offline_);
starting_.push_back(be);
}
@@ -268,8 +272,10 @@ TEST_F(ClusterMembershipMgrTest, TwoInstances) {
auto b1 = make_shared<TBackendDescriptor>(MakeBackendDescriptor(1));
auto b2 = make_shared<TBackendDescriptor>(MakeBackendDescriptor(2));
- ClusterMembershipMgr cmm1(b1->address.hostname, nullptr);
- ClusterMembershipMgr cmm2(b2->address.hostname, nullptr);
+ MetricGroup tmp_metrics1("test-metrics1");
+ MetricGroup tmp_metrics2("test-metrics2");
+ ClusterMembershipMgr cmm1(b1->address.hostname, nullptr, &tmp_metrics1);
+ ClusterMembershipMgr cmm2(b2->address.hostname, nullptr, &tmp_metrics2);
const Statestore::TopicId topic_id = Statestore::IMPALA_MEMBERSHIP_TOPIC;
StatestoreSubscriber::TopicDeltaMap topic_delta_map = {{topic_id, TTopicDelta()}};
diff --git a/be/src/scheduling/cluster-membership-mgr.cc b/be/src/scheduling/cluster-membership-mgr.cc
index fe3f98c..e81905e 100644
--- a/be/src/scheduling/cluster-membership-mgr.cc
+++ b/be/src/scheduling/cluster-membership-mgr.cc
@@ -19,6 +19,7 @@
#include "common/logging.h"
#include "common/names.h"
+#include "util/metrics.h"
#include "util/test-info.h"
namespace {
@@ -44,12 +45,22 @@ ExecutorGroup* FindOrInsertExecutorGroup(const TExecutorGroupDesc& group,
namespace impala {
-ClusterMembershipMgr::ClusterMembershipMgr(string local_backend_id,
- StatestoreSubscriber* subscriber) :
- current_membership_(std::make_shared<const Snapshot>()),
+static const string LIVE_EXEC_GROUP_KEY("cluster-membership.executor-groups.total");
+static const string HEALTHY_EXEC_GROUP_KEY(
+ "cluster-membership.executor-groups.total-healthy");
+static const string TOTAL_BACKENDS_KEY("cluster-membership.backends.total");
+
+ClusterMembershipMgr::ClusterMembershipMgr(
+ string local_backend_id, StatestoreSubscriber* subscriber, MetricGroup* metrics)
+ : current_membership_(std::make_shared<const Snapshot>()),
statestore_subscriber_(subscriber),
thrift_serializer_(/* compact= */ false),
local_backend_id_(move(local_backend_id)) {
+ DCHECK(metrics != nullptr);
+ MetricGroup* metric_grp = metrics->GetOrCreateChildGroup("cluster-membership");
+ total_live_executor_groups_ = metric_grp->AddCounter(LIVE_EXEC_GROUP_KEY, 0);
+ total_healthy_executor_groups_ = metric_grp->AddCounter(HEALTHY_EXEC_GROUP_KEY, 0);
+ total_backends_ = metric_grp->AddCounter(TOTAL_BACKENDS_KEY, 0);
}
Status ClusterMembershipMgr::Init() {
@@ -311,6 +322,8 @@ void ClusterMembershipMgr::UpdateMembership(
DCHECK(CheckConsistency(*new_backend_map, *new_executor_groups, *new_blacklist));
}
+ UpdateMetrics(*new_backend_map, *new_executor_groups);
+
// Don't send updates or update the current membership if the statestore is in its
// post-recovery grace period.
if (ss_is_recovering) {
@@ -527,4 +540,23 @@ bool ClusterMembershipMgr::CheckConsistency(const BackendIdMap& current_backends
return true;
}
+void ClusterMembershipMgr::UpdateMetrics(
+ const BackendIdMap& current_backends, const ExecutorGroups& executor_groups) {
+ int total_live_executor_groups = 0;
+ int total_healthy_executor_groups = 0;
+ for (const auto& group_it : executor_groups) {
+ const ExecutorGroup& group = group_it.second;
+ if (group.IsHealthy()) {
+ total_live_executor_groups++;
+ total_healthy_executor_groups++;
+ } else if (group.NumHosts() > 0) {
+ total_live_executor_groups++;
+ }
+ }
+ DCHECK_GE(total_live_executor_groups, total_healthy_executor_groups);
+ total_live_executor_groups_->SetValue(total_live_executor_groups);
+ total_healthy_executor_groups_->SetValue(total_healthy_executor_groups);
+ total_backends_->SetValue(current_backends.size());
+}
+
} // end namespace impala
diff --git a/be/src/scheduling/cluster-membership-mgr.h b/be/src/scheduling/cluster-membership-mgr.h
index ec11af3..8a09df0 100644
--- a/be/src/scheduling/cluster-membership-mgr.h
+++ b/be/src/scheduling/cluster-membership-mgr.h
@@ -32,6 +32,7 @@
#include "scheduling/executor-group.h"
#include "statestore/statestore-subscriber.h"
#include "util/container-util.h"
+#include "util/metrics-fwd.h"
namespace impala {
@@ -127,7 +128,8 @@ class ClusterMembershipMgr {
/// locks are held when calling this callback.
typedef std::function<Status(const TUpdateExecutorMembershipRequest&)> UpdateFrontendFn;
- ClusterMembershipMgr(std::string local_backend_id, StatestoreSubscriber* subscriber);
+ ClusterMembershipMgr(std::string local_backend_id, StatestoreSubscriber* subscriber,
+ MetricGroup* metrics);
/// Initializes instances of this class. This only sets up the statestore subscription.
/// Callbacks to the local ImpalaServer and Frontend must be registered in separate
@@ -205,11 +207,20 @@ class ClusterMembershipMgr {
bool CheckConsistency(const BackendIdMap& current_backends,
const ExecutorGroups& executor_groups, const ExecutorBlacklist& executor_blacklist);
+ /// Updates the membership metrics.
+ void UpdateMetrics(const BackendIdMap& current_backends,
+ const ExecutorGroups& executor_groups);
+
/// Ensures that only one thread is processing a membership update at a time, either
/// from a statestore update or a blacklisting decision. Must be taken before any other
/// locks in this class.
boost::mutex update_membership_lock_;
+ /// Membership metrics
+ IntCounter* total_live_executor_groups_ = nullptr;
+ IntCounter* total_healthy_executor_groups_ = nullptr;
+ IntCounter* total_backends_ = nullptr;
+
/// The snapshot of the current cluster membership. When receiving changes to the
/// executors configuration from the statestore we will make a copy of the stored
/// object, apply the updates to the copy and atomically swap the contents of this
diff --git a/be/src/scheduling/scheduler-test-util.cc b/be/src/scheduling/scheduler-test-util.cc
index 09a87fb..e0cd7f3 100644
--- a/be/src/scheduling/scheduler-test-util.cc
+++ b/be/src/scheduling/scheduler-test-util.cc
@@ -694,10 +694,10 @@ void SchedulerWrapper::InitializeScheduler() {
<< "hosts.";
const Host& scheduler_host = plan_.cluster().hosts()[0];
string scheduler_backend_id = scheduler_host.ip;
- cluster_membership_mgr_.reset(new ClusterMembershipMgr(scheduler_backend_id, nullptr));
- cluster_membership_mgr_->SetLocalBeDescFn([scheduler_host]() {
- return BuildBackendDescriptor(scheduler_host);
- });
+ cluster_membership_mgr_.reset(
+ new ClusterMembershipMgr(scheduler_backend_id, nullptr, &metrics_));
+ cluster_membership_mgr_->SetLocalBeDescFn(
+ [scheduler_host]() { return BuildBackendDescriptor(scheduler_host); });
Status status = cluster_membership_mgr_->Init();
DCHECK(status.ok()) << "Cluster membership manager init failed in test";
scheduler_.reset(new Scheduler(&metrics_, nullptr));
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index d2f44d4..511e3c5 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -242,8 +242,12 @@ DEFINE_bool(is_coordinator, true, "If true, this Impala daemon can accept and co
"queries from clients. If false, it will refuse client connections.");
DEFINE_bool(is_executor, true, "If true, this Impala daemon will execute query "
"fragments.");
-DEFINE_string(executor_groups, "", "List of executor groups, separated by comma. "
- "Currently only a single group may be specified.");
+DEFINE_string(executor_groups, "",
+ "List of executor groups, separated by comma. Each executor group specification can "
+ "optionally contain a minimum size, separated by a ':', e.g. --executor_groups "
+ "default-pool-1:3. Default minimum size is 1. Only when the cluster membership "
+ "contains at least that number of executors for the group will it be considered "
+ "healthy for admission. Currently only a single group may be specified.");
// TODO: can we automatically choose a startup grace period based on the max admission
// control queue timeout + some margin for error?
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index 2a0ee30..bd30674 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -2441,5 +2441,34 @@
"units": "NONE",
"kind": "GAUGE",
"key": "events-processor.events-received-15min-rate"
+ },
+ {
+ "description": "Total number of executor groups that have at least one executor",
+ "contexts": [
+ "IMPALAD"
+ ],
+ "label": "Total number of executor groups that have at least one executor",
+ "units": "NONE",
+ "kind": "COUNTER",
+ "key": "cluster-membership.executor-groups.total"
+ },
+ {
+ "description": "Total number of executor groups that are in a healthy state, that is, have at least the configured minimum number of executors to be considered for admission",
+ "contexts": [
+ "IMPALAD"
+ ],
+ "label": "Total number of executor groups that are in a healthy state",
+ "units": "NONE",
+ "kind": "COUNTER",
+ "key": "cluster-membership.executor-groups.total-healthy"
+ },{
+ "description": "Total number of backends registered with the statestore",
+ "contexts": [
+ "IMPALAD"
+ ],
+ "label": "Total number of backends registered with the statestore",
+ "units": "NONE",
+ "kind": "COUNTER",
+ "key": "cluster-membership.backends.total"
}
]
diff --git a/tests/custom_cluster/test_auto_scaling.py b/tests/custom_cluster/test_auto_scaling.py
index 3c5ecca..2bc4600 100644
--- a/tests/custom_cluster/test_auto_scaling.py
+++ b/tests/custom_cluster/test_auto_scaling.py
@@ -42,8 +42,8 @@ class TestAutoScaling(CustomClusterTestSuite):
def _get_total_admitted_queries(self):
return self.impalad_test_service.get_total_admitted_queries("default-pool")
- def _get_num_executors(self):
- return self.impalad_test_service.get_num_known_live_backends(only_executors=True)
+ def _get_num_backends(self):
+ return self.impalad_test_service.get_metric_value("cluster-membership.backends.total")
def _get_num_running_queries(self):
return self.impalad_test_service.get_num_running_queries("default-pool")
@@ -67,9 +67,12 @@ class TestAutoScaling(CustomClusterTestSuite):
workload.start()
# Wait for workers to spin up
- assert any(self._get_num_executors() >= GROUP_SIZE or sleep(1)
+ cluster_size = GROUP_SIZE + 1 # +1 to include coordinator.
+ assert any(self._get_num_backends() >= cluster_size or sleep(1)
for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \
"Number of backends did not increase within %s s" % self.STATE_CHANGE_TIMEOUT_S
+ assert self.impalad_test_service.get_metric_value(
+ "cluster-membership.executor-groups.total-healthy") >= 1
# Wait until we admitted at least 10 queries
assert any(self._get_total_admitted_queries() >= 10 or sleep(1)
@@ -77,26 +80,30 @@ class TestAutoScaling(CustomClusterTestSuite):
"Did not admit enough queries within %s s" % self.STATE_CHANGE_TIMEOUT_S
# Wait for second executor group to start
- num_expected = 2 * GROUP_SIZE
- assert any(self._get_num_executors() == num_expected or sleep(1)
+ cluster_size = (2 * GROUP_SIZE) + 1
+ assert any(self._get_num_backends() >= cluster_size or sleep(1)
for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \
"Number of backends did not reach %s within %s s" % (
- num_expected, self.STATE_CHANGE_TIMEOUT_S)
+ cluster_size, self.STATE_CHANGE_TIMEOUT_S)
+ assert self.impalad_test_service.get_metric_value(
+ "cluster-membership.executor-groups.total-healthy") >= 2
# Wait for query rate to surpass the maximum for a single executor group plus 20%
min_query_rate = 1.2 * EXECUTOR_SLOTS
assert any(workload.get_query_rate() > min_query_rate or sleep(1)
for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \
"Query rate did not surpass %s within %s s" % (
- num_expected, self.STATE_CHANGE_TIMEOUT_S)
+ cluster_size, self.STATE_CHANGE_TIMEOUT_S)
LOG.info("Stopping workload")
workload.stop()
# Wait for workers to spin down
- assert any(self._get_num_executors() == 0 or sleep(1)
- for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \
- "Backends did not shut down within %s s" % self.STATE_CHANGE_TIMEOUT_S
+ self.impalad_test_service.wait_for_metric_value(
+ "cluster-membership.backends.total", 1,
+ timeout=self.STATE_CHANGE_TIMEOUT_S, interval=1)
+ assert self.impalad_test_service.get_metric_value(
+ "cluster-membership.executor-groups.total") == 0
finally:
if workload:
@@ -122,9 +129,10 @@ class TestAutoScaling(CustomClusterTestSuite):
workload.start()
# Wait for workers to spin up
- assert any(self._get_num_executors() >= GROUP_SIZE or sleep(1)
- for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \
- "Number of backends did not increase within %s s" % self.STATE_CHANGE_TIMEOUT_S
+ cluster_size = GROUP_SIZE + 1 # +1 to include coordinator.
+ self.impalad_test_service.wait_for_metric_value(
+ "cluster-membership.backends.total", cluster_size,
+ timeout=self.STATE_CHANGE_TIMEOUT_S, interval=1)
# Wait until we admitted at least 10 queries
assert any(self._get_total_admitted_queries() >= 10 or sleep(1)
@@ -144,15 +152,18 @@ class TestAutoScaling(CustomClusterTestSuite):
"Unexpected number of running queries: %s" % num_running
# Check that only a single group started
- assert self._get_num_executors() == GROUP_SIZE
+ assert self.impalad_test_service.get_metric_value(
+ "cluster-membership.executor-groups.total-healthy") == 1
LOG.info("Stopping workload")
workload.stop()
# Wait for workers to spin down
- assert any(self._get_num_executors() == 0 or sleep(1)
- for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \
- "Backends did not shut down within %s s" % self.STATE_CHANGE_TIMEOUT_S
+ self.impalad_test_service.wait_for_metric_value(
+ "cluster-membership.backends.total", 1,
+ timeout=self.STATE_CHANGE_TIMEOUT_S, interval=1)
+ assert self.impalad_test_service.get_metric_value(
+ "cluster-membership.executor-groups.total") == 0
finally:
if workload:
@@ -179,22 +190,23 @@ class TestAutoScaling(CustomClusterTestSuite):
workload.start()
# Wait for first executor to start up
- assert any(self._get_num_executors() >= 1 or sleep(1)
- for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \
- "Number of backends did not increase within %s s" % self.STATE_CHANGE_TIMEOUT_S
+ self.impalad_test_service.wait_for_metric_value(
+ "cluster-membership.executor-groups.total", 1,
+ timeout=self.STATE_CHANGE_TIMEOUT_S, interval=1)
# Wait for remaining executors to start up and make sure that no queries are
# admitted during startup
end_time = time() + self.STATE_CHANGE_TIMEOUT_S
startup_complete = False
+ cluster_size = GROUP_SIZE + 1 # +1 to include coordinator.
while time() < end_time:
num_admitted = self._get_total_admitted_queries()
- num_backends = self._get_num_executors()
- if num_backends < GROUP_SIZE:
+ num_backends = self._get_num_backends()
+ if num_backends < cluster_size:
assert num_admitted == 0, "%s/%s backends started but %s queries have " \
- "already been admitted." % (num_backends, GROUP_SIZE, num_admitted)
+ "already been admitted." % (num_backends, cluster_size, num_admitted)
if num_admitted > 0:
- assert num_backends == GROUP_SIZE
+ assert num_backends == cluster_size
startup_complete = True
break
sleep(1)
@@ -205,9 +217,11 @@ class TestAutoScaling(CustomClusterTestSuite):
workload.stop()
# Wait for workers to spin down
- assert any(self._get_num_executors() == 0 or sleep(1)
- for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \
- "Backends did not shut down within %s s" % self.STATE_CHANGE_TIMEOUT_S
+ self.impalad_test_service.wait_for_metric_value(
+ "cluster-membership.backends.total", 1,
+ timeout=self.STATE_CHANGE_TIMEOUT_S, interval=1)
+ assert self.impalad_test_service.get_metric_value(
+ "cluster-membership.executor-groups.total") == 0
finally:
if workload:
diff --git a/tests/custom_cluster/test_executor_groups.py b/tests/custom_cluster/test_executor_groups.py
index 5a5a4b1..9c0315b 100644
--- a/tests/custom_cluster/test_executor_groups.py
+++ b/tests/custom_cluster/test_executor_groups.py
@@ -42,6 +42,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
self.num_groups = 1
self.num_executors = 1
super(TestExecutorGroups, self).setup_method(method)
+ self.coordinator = self.cluster.impalads[0]
def _group_name(self, name):
# By convention, group names must start with their associated resource pool name
@@ -88,6 +89,8 @@ class TestExecutorGroups(CustomClusterTestSuite):
"""Tests that a query submitted to a coordinator with no executor group times out."""
result = self.execute_query_expect_failure(self.client, "select sleep(2)")
assert "Admission for query exceeded timeout" in str(result)
+ assert self.coordinator.service.get_metric_value(
+ "cluster-membership.executor-groups.total-healthy") == 0
@pytest.mark.execute_serially
def test_single_group(self):
@@ -95,6 +98,8 @@ class TestExecutorGroups(CustomClusterTestSuite):
QUERY = "select count(*) from functional.alltypestiny"
self._add_executor_group("group1", 2)
self.execute_query_expect_success(self.client, QUERY)
+ assert self.coordinator.service.get_metric_value(
+ "cluster-membership.executor-groups.total-healthy") == 1
@pytest.mark.execute_serially
def test_executor_group_starts_while_qeueud(self):
@@ -105,8 +110,12 @@ class TestExecutorGroups(CustomClusterTestSuite):
handle = client.execute_async(QUERY)
profile = client.get_runtime_profile(handle)
assert "No healthy executor groups found for pool" in profile
+ assert self.coordinator.service.get_metric_value(
+ "cluster-membership.executor-groups.total-healthy") == 0
self._add_executor_group("group1", 2)
client.wait_for_finished_timeout(handle, 20)
+ assert self.coordinator.service.get_metric_value(
+ "cluster-membership.executor-groups.total-healthy") == 1
@pytest.mark.execute_serially
def test_executor_group_health(self):
@@ -114,14 +123,18 @@ class TestExecutorGroups(CustomClusterTestSuite):
QUERY = "select count(*) from functional.alltypestiny"
# Start cluster and group
self._add_executor_group("group1", 2)
+ self.coordinator.service.wait_for_metric_value(
+ "cluster-membership.executor-groups.total-healthy", 1)
client = self.client
# Run query to validate
self.execute_query_expect_success(client, QUERY)
# Kill an executor
- coordinator = self.cluster.impalads[0]
executor = self.cluster.impalads[1]
executor.kill()
- coordinator.service.wait_for_num_known_live_backends(2)
+ self.coordinator.service.wait_for_metric_value("cluster-membership.backends.total", 2,
+ timeout=20)
+ assert self.coordinator.service.get_metric_value(
+ "cluster-membership.executor-groups.total-healthy") == 0
# Run query and observe timeout
handle = client.execute_async(QUERY)
profile = client.get_runtime_profile(handle)
@@ -132,6 +145,8 @@ class TestExecutorGroups(CustomClusterTestSuite):
client.wait_for_finished_timeout(handle, 20)
# Run query and observe success
self.execute_query_expect_success(client, QUERY)
+ assert self.coordinator.service.wait_for_metric_value(
+ "cluster-membership.executor-groups.total-healthy", 1)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(impalad_args="-default_pool_max_requests=1")
@@ -146,19 +161,22 @@ class TestExecutorGroups(CustomClusterTestSuite):
profile = client.get_runtime_profile(q2)
assert "Initial admission queue reason: number of running queries" in profile, profile
# Kill an executor
- coordinator = self.cluster.impalads[0]
executor = self.cluster.impalads[1]
executor.kill()
- coordinator.service.wait_for_num_known_live_backends(2)
+ self.coordinator.service.wait_for_metric_value("cluster-membership.backends.total", 2)
# Wait for q1 to finish (sleep runs on the coordinator)
client.wait_for_finished_timeout(q1, 20)
# Check that q2 still hasn't run
profile = client.get_runtime_profile(q2)
assert "Admission result: Queued" in profile, profile
+ assert self.coordinator.service.get_metric_value(
+ "cluster-membership.executor-groups.total-healthy") == 0
# Restore executor group health
executor.start()
# Query should now finish
client.wait_for_finished_timeout(q2, 20)
+ assert self.coordinator.service.get_metric_value(
+ "cluster-membership.executor-groups.total-healthy") == 1
@pytest.mark.execute_serially
def test_max_concurrent_queries(self):
@@ -184,6 +202,8 @@ class TestExecutorGroups(CustomClusterTestSuite):
where month < 3 and id + random() < sleep(500);"
self._add_executor_group("group1", 2, max_concurrent_queries=1)
self._add_executor_group("group2", 2, max_concurrent_queries=1)
+ self.coordinator.service.wait_for_metric_value(
+ "cluster-membership.executor-groups.total-healthy", 2)
client = self.client
q1 = client.execute_async(QUERY)
client.wait_for_admission_control(q1)
@@ -257,6 +277,11 @@ class TestExecutorGroups(CustomClusterTestSuite):
QUERY = "select sleep(4)"
# Start first executor
self._add_executor_group("group1", 3, num_executors=1)
+ self.coordinator.service.wait_for_metric_value("cluster-membership.backends.total", 2)
+ assert self.coordinator.service.get_metric_value(
+ "cluster-membership.executor-groups.total") == 1
+ assert self.coordinator.service.get_metric_value(
+ "cluster-membership.executor-groups.total-healthy") == 0
# Run query and observe that it gets queued
client = self.client
handle = client.execute_async(QUERY)
@@ -266,10 +291,16 @@ class TestExecutorGroups(CustomClusterTestSuite):
initial_state = client.get_state(handle)
# Start another executor and observe that the query stays queued
self._add_executor_group("group1", 3, num_executors=1)
+ self.coordinator.service.wait_for_metric_value("cluster-membership.backends.total", 3)
+ assert self.coordinator.service.get_metric_value(
+ "cluster-membership.executor-groups.total-healthy") == 0
profile = client.get_runtime_profile(handle)
assert client.get_state(handle) == initial_state
# Start the remaining executor and observe that the query finishes
self._add_executor_group("group1", 3, num_executors=1)
+ self.coordinator.service.wait_for_metric_value("cluster-membership.backends.total", 4)
+ assert self.coordinator.service.get_metric_value(
+ "cluster-membership.executor-groups.total-healthy") == 1
client.wait_for_finished_timeout(handle, 20)
@pytest.mark.execute_serially
@@ -283,16 +314,18 @@ class TestExecutorGroups(CustomClusterTestSuite):
# Run query to make sure things work
QUERY = "select count(*) from functional.alltypestiny"
self.execute_query_expect_success(self.client, QUERY)
+ assert self.coordinator.service.get_metric_value(
+ "cluster-membership.executor-groups.total-healthy") == 1
# Kill executors to make group empty
impalads = self.cluster.impalads
impalads[1].kill()
impalads[2].kill()
- impalads[0].service.wait_for_num_known_live_backends(1)
+ self.coordinator.service.wait_for_metric_value("cluster-membership.backends.total", 1)
# Run query to make sure it times out
result = self.execute_query_expect_failure(self.client, QUERY)
- print str(result)
expected_error = "Query aborted:Admission for query exceeded timeout 2000ms in " \
"pool default-pool. Queued reason: No healthy executor groups " \
"found for pool default-pool."
assert expected_error in str(result)
-
+ assert self.coordinator.service.get_metric_value(
+ "cluster-membership.executor-groups.total-healthy") == 0