You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/04/26 06:23:41 UTC
[impala] 04/04: IMPALA-8446: Create a unit test for Admission
Controller.
This is an automated email from the ASF dual-hosted git repository.
tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 2dda47ec27e8f15810907d77c6e41c275355cea6
Author: Andrew Sherman <as...@cloudera.com>
AuthorDate: Mon Apr 22 09:52:28 2019 -0700
IMPALA-8446: Create a unit test for Admission Controller.
This test allows construction of white box tests that exercise Admission
Controller code.
The initial test cases are
(1) a test which does the following:
+ creates a RequestPoolService which reads some Admission Controller
configuration files
+ creates an Admission Controller object
+ creates a QuerySchedule for a query
+ tests if the query can be admitted by the Admission Controller
+ simulates activity in the cluster which consumes memory
+ tests that the earlier query cannot now be admitted by the Admission
Controller
(2) a test which verifies that the configuration files are read
correctly.
TESTING:
Ran end-to-end tests cleanly and checked that the new tests were
executed.
Change-Id: I8a840590b868f2df1a06f3f397b7b0fc2b29462c
Reviewed-on: http://gerrit.cloudera.org:8080/13078
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/scheduling/CMakeLists.txt | 1 +
be/src/scheduling/admission-controller-test.cc | 223 +++++++++++++++++++++++++
be/src/scheduling/admission-controller.cc | 18 +-
be/src/scheduling/admission-controller.h | 18 +-
be/src/scheduling/query-schedule.cc | 16 +-
be/src/scheduling/query-schedule.h | 4 +
6 files changed, 263 insertions(+), 17 deletions(-)
diff --git a/be/src/scheduling/CMakeLists.txt b/be/src/scheduling/CMakeLists.txt
index 998a05c..a1ef269 100644
--- a/be/src/scheduling/CMakeLists.txt
+++ b/be/src/scheduling/CMakeLists.txt
@@ -37,3 +37,4 @@ add_dependencies(Scheduling gen-deps)
ADD_BE_LSAN_TEST(scheduler-test)
ADD_BE_LSAN_TEST(backend-config-test)
ADD_BE_LSAN_TEST(hash-ring-test)
+ADD_BE_LSAN_TEST(admission-controller-test)
diff --git a/be/src/scheduling/admission-controller-test.cc b/be/src/scheduling/admission-controller-test.cc
new file mode 100644
index 0000000..c3fa309
--- /dev/null
+++ b/be/src/scheduling/admission-controller-test.cc
@@ -0,0 +1,223 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "scheduling/admission-controller.h"
+
+#include "common/names.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/logging_test_util.h"
+#include "runtime/exec-env.h"
+#include "runtime/runtime-state.h"
+#include "runtime/test-env.h"
+#include "service/fe-support.h"
+#include "testutil/gtest-util.h"
+
+// Access the flags that are defined in RequestPoolService.
+DECLARE_string(fair_scheduler_allocation_path);
+DECLARE_string(llama_site_path);
+
+namespace impala {
+
+static const string IMPALA_HOME(getenv("IMPALA_HOME"));
+
+// Queues used in the configuration files fair-scheduler-test2.xml and
+// llama-site-test2.xml.
+static const string QUEUE_A = "root.queueA";
+static const string QUEUE_B = "root.queueB";
+static const string QUEUE_C = "root.queueC";
+
+// Host names
+static const string HOST_1 = "host1:25000";
+static const string HOST_2 = "host2:25000";
+
+/// Parent class for Admission Controller tests.
+/// Common code and constants should go here.
+class AdmissionControllerTest : public testing::Test {
+ protected:
+ boost::scoped_ptr<TestEnv> test_env_;
+
+ virtual void SetUp() {
+ // Establish a TestENv so that ExecEnv works in tests.
+ test_env_.reset(new TestEnv);
+ ASSERT_OK(test_env_->Init());
+ }
+
+ virtual void TearDown() {}
+
+ /// Build a TTopicDelta object for IMPALA_REQUEST_QUEUE_TOPIC.
+ static TTopicDelta MakeTopicDelta(const bool is_delta) {
+ TTopicDelta delta;
+ delta.topic_name = Statestore::IMPALA_REQUEST_QUEUE_TOPIC;
+ delta.is_delta = is_delta;
+ return delta;
+ }
+
+ /// Build a TPoolStats object.
+ static TPoolStats MakePoolStats(const int backend_mem_reserved,
+ const int num_admitted_running, const int num_queued) {
+ TPoolStats stats;
+ stats.backend_mem_reserved = backend_mem_reserved;
+ stats.num_admitted_running = num_admitted_running;
+ stats.num_queued = num_queued;
+ return stats;
+ }
+
+ /// Add a TPoolStats to the TTopicDelta 'delta' with a key created from 'host' and
+ /// 'pool_name'
+ static void AddStatsToTopic(
+ TTopicDelta* topic, const string host, const string pool_name, TPoolStats stats) {
+ // Build topic item.
+ TTopicItem item;
+ item.key = AdmissionController::MakePoolTopicKey(pool_name, host);
+ ThriftSerializer serializer(false);
+ Status status = serializer.SerializeToString(&stats, &item.value);
+ DCHECK(status.ok());
+
+ // Add to the topic.
+ topic->topic_entries.push_back(item);
+ }
+
+ /// Build a TQueryExecRequest object.
+ static TQueryExecRequest MakeQueryExecRequest(
+ const string pool_name, const int per_host_mem_estimate) {
+ TQueryExecRequest request;
+ request.query_ctx.request_pool = pool_name;
+ request.__set_per_host_mem_estimate(per_host_mem_estimate);
+ return request;
+ }
+
+ /// Check that PoolConfig can be read from a RequestPoolService, and that the
+ /// configured values are as expected.
+ static void CheckPoolConfig(RequestPoolService& request_pool_service,
+ const string pool_name, const int64_t max_requests, const int64_t max_mem_resources,
+ const int64_t queue_timeout_ms, const bool clamp_mem_limit_query_option,
+ const int64_t min_query_mem_limit = 0, const int64_t max_query_mem_limit = 0) {
+ TPoolConfig config;
+ ASSERT_OK(request_pool_service.GetPoolConfig(pool_name, &config));
+
+ ASSERT_EQ(max_requests, config.max_requests);
+ ASSERT_EQ(max_mem_resources, config.max_mem_resources);
+ ASSERT_EQ(queue_timeout_ms, config.queue_timeout_ms);
+ ASSERT_EQ(clamp_mem_limit_query_option, config.clamp_mem_limit_query_option);
+ ASSERT_EQ(min_query_mem_limit, config.min_query_mem_limit);
+ ASSERT_EQ(max_query_mem_limit, config.max_query_mem_limit);
+ }
+
+ /// Return the path of the configuration file in the test resources directory
+ /// that has name 'file_name'.
+ static string GetResourceFile(const string& file_name) {
+ return Substitute("$0/fe/src/test/resources/$1", IMPALA_HOME, file_name);
+ }
+};
+
+/// Test that AdmissionController will admit a query into a pool, then simulate other
+/// work being added to the pool, and then test that the AdmissionController will no
+/// longer admit the query.
+/// This is a single threaded test so we access the internal data structures of
+/// the AdmissionController object, and call methods such as 'GetPoolStats' without
+/// taking the admission_ctrl_lock_ lock.
+TEST_F(AdmissionControllerTest, Simple) {
+ // Pass the paths of the configuration files as command line flags
+ FLAGS_fair_scheduler_allocation_path = GetResourceFile("fair-scheduler-test2.xml");
+ FLAGS_llama_site_path = GetResourceFile("llama-site-test2.xml");
+
+ // Create a RequestPoolService which will read the configuration files.
+ MetricGroup metric_group("impala-metrics");
+ RequestPoolService request_pool_service(&metric_group);
+
+ // Create an AdmissionController running on host0.
+ TNetworkAddress addr = MakeNetworkAddress("host0", 25000);
+ AdmissionController admission_controller(
+ nullptr, &request_pool_service, &metric_group, addr);
+
+ // Get the PoolConfig for QUEUE_C ("root.queueC").
+ TPoolConfig config_c;
+ ASSERT_OK(request_pool_service.GetPoolConfig(QUEUE_C, &config_c));
+
+ // Create a QuerySchedule to run on QUEUE_C.
+ TQueryExecRequest request = MakeQueryExecRequest(QUEUE_C, 1000);
+ ObjectPool pool;
+ RuntimeProfile* profile = RuntimeProfile::Create(&pool, "pool1");
+ TUniqueId unique_id;
+ TQueryOptions query_options;
+ QuerySchedule query_schedule(unique_id, request, query_options, profile);
+ query_schedule.UpdateMemoryRequirements(config_c);
+
+ // Check that the AdmissionController initially has no data about other hosts.
+ ASSERT_EQ(0, admission_controller.host_mem_reserved_.size());
+
+ // Check that the query can be admitted.
+ string not_admitted_reason;
+ ASSERT_TRUE(admission_controller.CanAdmitRequest(
+ query_schedule, config_c, true /* admit_from_queue */, ¬_admitted_reason));
+
+ // Make a TopicDeltaMap describing some activity on host1 and host2.
+ TTopicDelta membership = MakeTopicDelta(false);
+ AddStatsToTopic(&membership, HOST_1, QUEUE_B, MakePoolStats(1000, 1, 0));
+ AddStatsToTopic(&membership, HOST_1, QUEUE_C, MakePoolStats(5000, 10, 0));
+ AddStatsToTopic(&membership, HOST_2, QUEUE_C, MakePoolStats(5000, 1, 0));
+
+ // Imitate the StateStore passing updates on query activity to the
+ // AdmissionController.
+ StatestoreSubscriber::TopicDeltaMap incoming_topic_deltas;
+ incoming_topic_deltas.emplace(Statestore::IMPALA_REQUEST_QUEUE_TOPIC, membership);
+ vector<TTopicDelta> outgoing_topic_updates;
+ admission_controller.UpdatePoolStats(incoming_topic_deltas, &outgoing_topic_updates);
+
+ // Check that the AdmissionController has aggregated the remote stats.
+ ASSERT_EQ(3, admission_controller.host_mem_reserved_.size());
+ ASSERT_EQ(6000, admission_controller.host_mem_reserved_[HOST_1]);
+ ASSERT_EQ(5000, admission_controller.host_mem_reserved_[HOST_2]);
+
+ // Check the PoolStats for QUEUE_C
+ AdmissionController::PoolStats* pool_stats = admission_controller.GetPoolStats(QUEUE_C);
+ ASSERT_EQ(10000, pool_stats->agg_mem_reserved_);
+ ASSERT_EQ(11, pool_stats->agg_num_running_);
+
+ // Test that the query cannot be admitted now.
+ ASSERT_FALSE(admission_controller.CanAdmitRequest(
+ query_schedule, config_c, true /* admit_from_queue */, ¬_admitted_reason));
+ EXPECT_STR_CONTAINS(
+ not_admitted_reason, "number of running queries 11 is at or over limit 10.");
+}
+
+/// Test that RequestPoolService correctly reads configuration files.
+TEST_F(AdmissionControllerTest, Config) {
+ // Pass the paths of the configuration files as command line flags
+ FLAGS_fair_scheduler_allocation_path = GetResourceFile("fair-scheduler-test2.xml");
+ FLAGS_llama_site_path = GetResourceFile("llama-site-test2.xml");
+
+ // Create a RequestPoolService which will read the configuration files.
+ MetricGroup metric_group("impala-metrics");
+
+ RequestPoolService request_pool_service(&metric_group);
+
+ // Test that the pool configurations can be read correctly.
+ CheckPoolConfig(request_pool_service, "non-existent queue", 5, -1, 30000, true);
+ CheckPoolConfig(request_pool_service, QUEUE_A, 1, 100000L * 1024L * 1024L, 50, true);
+ CheckPoolConfig(request_pool_service, QUEUE_B, 5, -1, 600000, true);
+ CheckPoolConfig(request_pool_service, QUEUE_C, 10, 128L * 1024L * 1024L, 30000, true);
+}
+
+} // end namespace impala
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
+ impala::InitFeSupport();
+ return RUN_ALL_TESTS();
+}
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index 569e24f..05b3274 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -219,16 +219,6 @@ static inline bool ParsePoolTopicKey(const string& topic_key, string* pool_name,
return true;
}
-// Returns the topic key for the pool at this backend, i.e. a string of the
-// form: "<pool_name><delimiter><backend_id>".
-static inline string MakePoolTopicKey(const string& pool_name,
- const string& backend_id) {
- // Ensure the backend_id does not contain the delimiter to ensure that the topic key
- // can be parsed properly by finding the last instance of the delimiter.
- DCHECK_EQ(backend_id.find(TOPIC_KEY_DELIMITER), string::npos);
- return Substitute("$0$1$2", pool_name, TOPIC_KEY_DELIMITER, backend_id);
-}
-
// Return a debug string for the pool stats.
static string DebugPoolStats(const TPoolStats& stats) {
stringstream ss;
@@ -1324,4 +1314,12 @@ void AdmissionController::PopulatePerHostMemReservedAndAdmitted(
(*mem_map)[elem.first] = make_pair(elem.second, host_mem_admitted_[elem.first]);
}
}
+
+string AdmissionController::MakePoolTopicKey(
+ const string& pool_name, const string& backend_id) {
+ // Ensure the backend_id does not contain the delimiter to ensure that the topic key
+ // can be parsed properly by finding the last instance of the delimiter.
+ DCHECK_EQ(backend_id.find(TOPIC_KEY_DELIMITER), string::npos);
+ return Substitute("$0$1$2", pool_name, TOPIC_KEY_DELIMITER, backend_id);
+}
}
diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h
index 14d59a9..85ea347 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -25,7 +25,7 @@
#include <boost/unordered_map.hpp>
#include <boost/unordered_set.hpp>
-#include <boost/thread/mutex.hpp>
+#include <gtest/gtest_prod.h>
#include "common/status.h"
#include "scheduling/request-pool-service.h"
@@ -95,10 +95,10 @@ enum class AdmissionOutcome {
/// reservation of the query. The final memory limit used is also clamped by the max/min
/// memory limits configured for the pool with an option to not enforce these limits on
/// the MEM_LIMIT query option (If both these max/min limits are not configured, then the
-/// estimates from planning are not used as a memory limit and only used for making
+/// estimates from planning are not used as a memory limit and are only used for making
/// admission decisions. Moreover the estimates will no longer have a lower bound based on
/// the largest initial reservation).
-/// The following three conditions must hold in order for the request to be admitted:
+/// The following four conditions must hold in order for the request to be admitted:
/// 1) The current pool configuration is valid.
/// 2) There must be enough memory resources available in this resource pool for the
/// request. The max memory resources configured for the resource pool specifies the
@@ -388,9 +388,8 @@ class AdmissionController {
void Queue(const QuerySchedule& schedule);
void Dequeue(const QuerySchedule& schedule, bool timed_out);
-
/// STATESTORE CALLBACK METHODS
- /// Updates the local_stats_.mem_reserved with the pool mem tracker. Called
+ /// Updates the local_stats_.backend_mem_reserved with the pool mem tracker. Called
/// before sending local_stats().
void UpdateMemTrackerStats();
@@ -484,6 +483,8 @@ class AdmissionController {
static const double EMA_MULTIPLIER;
void InitMetrics();
+
+ FRIEND_TEST(AdmissionControllerTest, Simple);
};
/// Map of pool names to pool stats. Accessed via GetPoolStats().
@@ -626,6 +627,13 @@ class AdmissionController {
/// Same as GetStalenessDetail() except caller must hold 'admission_ctrl_lock_'.
std::string GetStalenessDetailLocked(const std::string& prefix,
int64_t* ms_since_last_update = nullptr);
+
+ /// Returns the topic key for the pool at this backend, i.e. a string of the
+ /// form: "<pool_name><delimiter><backend_id>".
+ static string MakePoolTopicKey(const string& pool_name, const string& backend_id);
+
+ FRIEND_TEST(AdmissionControllerTest, Simple);
+ friend class AdmissionControllerTest;
};
} // namespace impala
diff --git a/be/src/scheduling/query-schedule.cc b/be/src/scheduling/query-schedule.cc
index 365c981..f1c6770 100644
--- a/be/src/scheduling/query-schedule.cc
+++ b/be/src/scheduling/query-schedule.cc
@@ -26,9 +26,9 @@
#include "util/container-util.h"
#include "util/mem-info.h"
#include "util/network-util.h"
-#include "util/uid-util.h"
-#include "util/debug-util.h"
#include "util/parse-util.h"
+#include "util/test-info.h"
+#include "util/uid-util.h"
#include "common/names.h"
@@ -51,6 +51,18 @@ QuerySchedule::QuerySchedule(const TUniqueId& query_id,
Init();
}
+QuerySchedule::QuerySchedule(const TUniqueId& query_id, const TQueryExecRequest& request,
+ const TQueryOptions& query_options, RuntimeProfile* summary_profile)
+ : query_id_(query_id),
+ request_(request),
+ query_options_(query_options),
+ summary_profile_(summary_profile),
+ num_scan_ranges_(0),
+ next_instance_id_(query_id) {
+ // Init() is not called, this constructor is for white box testing only.
+ DCHECK(TestInfo::is_test());
+}
+
void QuerySchedule::Init() {
// extract TPlanFragments and order by fragment idx
vector<const TPlanFragment*> fragments;
diff --git a/be/src/scheduling/query-schedule.h b/be/src/scheduling/query-schedule.h
index 6021f98..acab122 100644
--- a/be/src/scheduling/query-schedule.h
+++ b/be/src/scheduling/query-schedule.h
@@ -157,6 +157,10 @@ class QuerySchedule {
const TQueryOptions& query_options, RuntimeProfile* summary_profile,
RuntimeProfile::EventSequence* query_events);
+ /// For testing only: build a QuerySchedule object but do not run Init().
+ QuerySchedule(const TUniqueId& query_id, const TQueryExecRequest& request,
+ const TQueryOptions& query_options, RuntimeProfile* summary_profile);
+
/// Verifies that the schedule is well-formed (and DCHECKs if it isn't):
/// - all fragments have a FragmentExecParams
/// - all scan ranges are assigned