You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/04/26 06:23:41 UTC

[impala] 04/04: IMPALA-8446: Create a unit test for Admission Controller.

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 2dda47ec27e8f15810907d77c6e41c275355cea6
Author: Andrew Sherman <as...@cloudera.com>
AuthorDate: Mon Apr 22 09:52:28 2019 -0700

    IMPALA-8446: Create a unit test for Admission Controller.
    
    This test allows construction of white box tests that exercise Admission
    Controller code.
    
    The initial test cases are
    (1) a test which does the following:
      + creates a RequestPoolService which reads some Admission Controller
        configuration files
      + creates an Admission Controller object
      + creates a QuerySchedule for a query
      + tests if the query can be admitted by the Admission Controller
      + simulates activity in the cluster which consumes memory
      + tests that the earlier query cannot now be admitted by the Admission
        Controller
    (2) a test which verifies that the configuration files are read
        correctly.
    
    TESTING:
    Ran end-to-end tests cleanly and checked that the new tests were
    executed.
    
    Change-Id: I8a840590b868f2df1a06f3f397b7b0fc2b29462c
    Reviewed-on: http://gerrit.cloudera.org:8080/13078
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/scheduling/CMakeLists.txt               |   1 +
 be/src/scheduling/admission-controller-test.cc | 223 +++++++++++++++++++++++++
 be/src/scheduling/admission-controller.cc      |  18 +-
 be/src/scheduling/admission-controller.h       |  18 +-
 be/src/scheduling/query-schedule.cc            |  16 +-
 be/src/scheduling/query-schedule.h             |   4 +
 6 files changed, 263 insertions(+), 17 deletions(-)

diff --git a/be/src/scheduling/CMakeLists.txt b/be/src/scheduling/CMakeLists.txt
index 998a05c..a1ef269 100644
--- a/be/src/scheduling/CMakeLists.txt
+++ b/be/src/scheduling/CMakeLists.txt
@@ -37,3 +37,4 @@ add_dependencies(Scheduling gen-deps)
 ADD_BE_LSAN_TEST(scheduler-test)
 ADD_BE_LSAN_TEST(backend-config-test)
 ADD_BE_LSAN_TEST(hash-ring-test)
+ADD_BE_LSAN_TEST(admission-controller-test)
diff --git a/be/src/scheduling/admission-controller-test.cc b/be/src/scheduling/admission-controller-test.cc
new file mode 100644
index 0000000..c3fa309
--- /dev/null
+++ b/be/src/scheduling/admission-controller-test.cc
@@ -0,0 +1,223 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "scheduling/admission-controller.h"
+
+#include "common/names.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/logging_test_util.h"
+#include "runtime/exec-env.h"
+#include "runtime/runtime-state.h"
+#include "runtime/test-env.h"
+#include "service/fe-support.h"
+#include "testutil/gtest-util.h"
+
+// Access the flags that are defined in RequestPoolService.
+DECLARE_string(fair_scheduler_allocation_path);
+DECLARE_string(llama_site_path);
+
+namespace impala {
+
+static const string IMPALA_HOME(getenv("IMPALA_HOME"));
+
+// Queues used in the configuration files fair-scheduler-test2.xml and
+// llama-site-test2.xml.
+static const string QUEUE_A = "root.queueA";
+static const string QUEUE_B = "root.queueB";
+static const string QUEUE_C = "root.queueC";
+
+// Host names
+static const string HOST_1 = "host1:25000";
+static const string HOST_2 = "host2:25000";
+
+/// Parent class for Admission Controller tests.
+/// Common code and constants should go here.
+class AdmissionControllerTest : public testing::Test {
+ protected:
+  boost::scoped_ptr<TestEnv> test_env_;
+
+  virtual void SetUp() {
+    // Establish a TestENv so that ExecEnv works in tests.
+    test_env_.reset(new TestEnv);
+    ASSERT_OK(test_env_->Init());
+  }
+
+  virtual void TearDown() {}
+
+  /// Build a TTopicDelta object for IMPALA_REQUEST_QUEUE_TOPIC.
+  static TTopicDelta MakeTopicDelta(const bool is_delta) {
+    TTopicDelta delta;
+    delta.topic_name = Statestore::IMPALA_REQUEST_QUEUE_TOPIC;
+    delta.is_delta = is_delta;
+    return delta;
+  }
+
+  /// Build a TPoolStats object.
+  static TPoolStats MakePoolStats(const int backend_mem_reserved,
+      const int num_admitted_running, const int num_queued) {
+    TPoolStats stats;
+    stats.backend_mem_reserved = backend_mem_reserved;
+    stats.num_admitted_running = num_admitted_running;
+    stats.num_queued = num_queued;
+    return stats;
+  }
+
+  /// Add a TPoolStats to the TTopicDelta 'delta' with a key created from 'host' and
+  /// 'pool_name'
+  static void AddStatsToTopic(
+      TTopicDelta* topic, const string host, const string pool_name, TPoolStats stats) {
+    // Build topic item.
+    TTopicItem item;
+    item.key = AdmissionController::MakePoolTopicKey(pool_name, host);
+    ThriftSerializer serializer(false);
+    Status status = serializer.SerializeToString(&stats, &item.value);
+    DCHECK(status.ok());
+
+    // Add to the topic.
+    topic->topic_entries.push_back(item);
+  }
+
+  /// Build a TQueryExecRequest object.
+  static TQueryExecRequest MakeQueryExecRequest(
+      const string pool_name, const int per_host_mem_estimate) {
+    TQueryExecRequest request;
+    request.query_ctx.request_pool = pool_name;
+    request.__set_per_host_mem_estimate(per_host_mem_estimate);
+    return request;
+  }
+
+  /// Check that PoolConfig can be read from a RequestPoolService, and that the
+  /// configured values are as expected.
+  static void CheckPoolConfig(RequestPoolService& request_pool_service,
+      const string pool_name, const int64_t max_requests, const int64_t max_mem_resources,
+      const int64_t queue_timeout_ms, const bool clamp_mem_limit_query_option,
+      const int64_t min_query_mem_limit = 0, const int64_t max_query_mem_limit = 0) {
+    TPoolConfig config;
+    ASSERT_OK(request_pool_service.GetPoolConfig(pool_name, &config));
+
+    ASSERT_EQ(max_requests, config.max_requests);
+    ASSERT_EQ(max_mem_resources, config.max_mem_resources);
+    ASSERT_EQ(queue_timeout_ms, config.queue_timeout_ms);
+    ASSERT_EQ(clamp_mem_limit_query_option, config.clamp_mem_limit_query_option);
+    ASSERT_EQ(min_query_mem_limit, config.min_query_mem_limit);
+    ASSERT_EQ(max_query_mem_limit, config.max_query_mem_limit);
+  }
+
+  /// Return the path of the configuration file in the test resources directory
+  /// that has name 'file_name'.
+  static string GetResourceFile(const string& file_name) {
+    return Substitute("$0/fe/src/test/resources/$1", IMPALA_HOME, file_name);
+  }
+};
+
+/// Test that AdmissionController will admit a query into a pool, then simulate other
+/// work being added to the pool, and then test that the AdmissionController will no
+/// longer admit the query.
+/// This is a single threaded test so we access the internal data structures of
+/// the AdmissionController object, and call methods such as 'GetPoolStats' without
+/// taking the admission_ctrl_lock_ lock.
+TEST_F(AdmissionControllerTest, Simple) {
+  // Pass the paths of the configuration files as command line flags
+  FLAGS_fair_scheduler_allocation_path = GetResourceFile("fair-scheduler-test2.xml");
+  FLAGS_llama_site_path = GetResourceFile("llama-site-test2.xml");
+
+  // Create a RequestPoolService which will read the configuration files.
+  MetricGroup metric_group("impala-metrics");
+  RequestPoolService request_pool_service(&metric_group);
+
+  // Create an AdmissionController running on host0.
+  TNetworkAddress addr = MakeNetworkAddress("host0", 25000);
+  AdmissionController admission_controller(
+      nullptr, &request_pool_service, &metric_group, addr);
+
+  // Get the PoolConfig for QUEUE_C ("root.queueC").
+  TPoolConfig config_c;
+  ASSERT_OK(request_pool_service.GetPoolConfig(QUEUE_C, &config_c));
+
+  // Create a QuerySchedule to run on QUEUE_C.
+  TQueryExecRequest request = MakeQueryExecRequest(QUEUE_C, 1000);
+  ObjectPool pool;
+  RuntimeProfile* profile = RuntimeProfile::Create(&pool, "pool1");
+  TUniqueId unique_id;
+  TQueryOptions query_options;
+  QuerySchedule query_schedule(unique_id, request, query_options, profile);
+  query_schedule.UpdateMemoryRequirements(config_c);
+
+  // Check that the AdmissionController initially has no data about other hosts.
+  ASSERT_EQ(0, admission_controller.host_mem_reserved_.size());
+
+  // Check that the query can be admitted.
+  string not_admitted_reason;
+  ASSERT_TRUE(admission_controller.CanAdmitRequest(
+      query_schedule, config_c, true /* admit_from_queue */, &not_admitted_reason));
+
+  // Make a TopicDeltaMap describing some activity on host1 and host2.
+  TTopicDelta membership = MakeTopicDelta(false);
+  AddStatsToTopic(&membership, HOST_1, QUEUE_B, MakePoolStats(1000, 1, 0));
+  AddStatsToTopic(&membership, HOST_1, QUEUE_C, MakePoolStats(5000, 10, 0));
+  AddStatsToTopic(&membership, HOST_2, QUEUE_C, MakePoolStats(5000, 1, 0));
+
+  // Imitate the StateStore passing updates on query activity to the
+  // AdmissionController.
+  StatestoreSubscriber::TopicDeltaMap incoming_topic_deltas;
+  incoming_topic_deltas.emplace(Statestore::IMPALA_REQUEST_QUEUE_TOPIC, membership);
+  vector<TTopicDelta> outgoing_topic_updates;
+  admission_controller.UpdatePoolStats(incoming_topic_deltas, &outgoing_topic_updates);
+
+  // Check that the AdmissionController has aggregated the remote stats.
+  ASSERT_EQ(3, admission_controller.host_mem_reserved_.size());
+  ASSERT_EQ(6000, admission_controller.host_mem_reserved_[HOST_1]);
+  ASSERT_EQ(5000, admission_controller.host_mem_reserved_[HOST_2]);
+
+  // Check the PoolStats for QUEUE_C
+  AdmissionController::PoolStats* pool_stats = admission_controller.GetPoolStats(QUEUE_C);
+  ASSERT_EQ(10000, pool_stats->agg_mem_reserved_);
+  ASSERT_EQ(11, pool_stats->agg_num_running_);
+
+  // Test that the query cannot be admitted now.
+  ASSERT_FALSE(admission_controller.CanAdmitRequest(
+      query_schedule, config_c, true /* admit_from_queue */, &not_admitted_reason));
+  EXPECT_STR_CONTAINS(
+      not_admitted_reason, "number of running queries 11 is at or over limit 10.");
+}
+
+/// Test that RequestPoolService correctly reads configuration files.
+TEST_F(AdmissionControllerTest, Config) {
+  // Pass the paths of the configuration files as command line flags
+  FLAGS_fair_scheduler_allocation_path = GetResourceFile("fair-scheduler-test2.xml");
+  FLAGS_llama_site_path = GetResourceFile("llama-site-test2.xml");
+
+  // Create a RequestPoolService which will read the configuration files.
+  MetricGroup metric_group("impala-metrics");
+
+  RequestPoolService request_pool_service(&metric_group);
+
+  // Test that the pool configurations can be read correctly.
+  CheckPoolConfig(request_pool_service, "non-existent queue", 5, -1, 30000, true);
+  CheckPoolConfig(request_pool_service, QUEUE_A, 1, 100000L * 1024L * 1024L, 50, true);
+  CheckPoolConfig(request_pool_service, QUEUE_B, 5, -1, 600000, true);
+  CheckPoolConfig(request_pool_service, QUEUE_C, 10, 128L * 1024L * 1024L, 30000, true);
+}
+
+} // end namespace impala
+
+int main(int argc, char** argv) {
+  ::testing::InitGoogleTest(&argc, argv);
+  impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
+  impala::InitFeSupport();
+  return RUN_ALL_TESTS();
+}
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index 569e24f..05b3274 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -219,16 +219,6 @@ static inline bool ParsePoolTopicKey(const string& topic_key, string* pool_name,
   return true;
 }
 
-// Returns the topic key for the pool at this backend, i.e. a string of the
-// form: "<pool_name><delimiter><backend_id>".
-static inline string MakePoolTopicKey(const string& pool_name,
-    const string& backend_id) {
-  // Ensure the backend_id does not contain the delimiter to ensure that the topic key
-  // can be parsed properly by finding the last instance of the delimiter.
-  DCHECK_EQ(backend_id.find(TOPIC_KEY_DELIMITER), string::npos);
-  return Substitute("$0$1$2", pool_name, TOPIC_KEY_DELIMITER, backend_id);
-}
-
 // Return a debug string for the pool stats.
 static string DebugPoolStats(const TPoolStats& stats) {
   stringstream ss;
@@ -1324,4 +1314,12 @@ void AdmissionController::PopulatePerHostMemReservedAndAdmitted(
     (*mem_map)[elem.first] = make_pair(elem.second, host_mem_admitted_[elem.first]);
   }
 }
+
+string AdmissionController::MakePoolTopicKey(
+    const string& pool_name, const string& backend_id) {
+  // Ensure the backend_id does not contain the delimiter to ensure that the topic key
+  // can be parsed properly by finding the last instance of the delimiter.
+  DCHECK_EQ(backend_id.find(TOPIC_KEY_DELIMITER), string::npos);
+  return Substitute("$0$1$2", pool_name, TOPIC_KEY_DELIMITER, backend_id);
+}
 }
diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h
index 14d59a9..85ea347 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -25,7 +25,7 @@
 
 #include <boost/unordered_map.hpp>
 #include <boost/unordered_set.hpp>
-#include <boost/thread/mutex.hpp>
+#include <gtest/gtest_prod.h>
 
 #include "common/status.h"
 #include "scheduling/request-pool-service.h"
@@ -95,10 +95,10 @@ enum class AdmissionOutcome {
 /// reservation of the query. The final memory limit used is also clamped by the max/min
 /// memory limits configured for the pool with an option to not enforce these limits on
 /// the MEM_LIMIT query option (If both these max/min limits are not configured, then the
-/// estimates from planning are not used as a memory limit and only used for making
+/// estimates from planning are not used as a memory limit and are only used for making
 /// admission decisions. Moreover the estimates will no longer have a lower bound based on
 /// the largest initial reservation).
-/// The following three conditions must hold in order for the request to be admitted:
+/// The following four conditions must hold in order for the request to be admitted:
 ///  1) The current pool configuration is valid.
 ///  2) There must be enough memory resources available in this resource pool for the
 ///     request. The max memory resources configured for the resource pool specifies the
@@ -388,9 +388,8 @@ class AdmissionController {
     void Queue(const QuerySchedule& schedule);
     void Dequeue(const QuerySchedule& schedule, bool timed_out);
 
-
     /// STATESTORE CALLBACK METHODS
-    /// Updates the local_stats_.mem_reserved with the pool mem tracker. Called
+    /// Updates the local_stats_.backend_mem_reserved with the pool mem tracker. Called
     /// before sending local_stats().
     void UpdateMemTrackerStats();
 
@@ -484,6 +483,8 @@ class AdmissionController {
     static const double EMA_MULTIPLIER;
 
     void InitMetrics();
+
+    FRIEND_TEST(AdmissionControllerTest, Simple);
   };
 
   /// Map of pool names to pool stats. Accessed via GetPoolStats().
@@ -626,6 +627,13 @@ class AdmissionController {
   /// Same as GetStalenessDetail() except caller must hold 'admission_ctrl_lock_'.
   std::string GetStalenessDetailLocked(const std::string& prefix,
       int64_t* ms_since_last_update = nullptr);
+
+  /// Returns the topic key for the pool at this backend, i.e. a string of the
+  /// form: "<pool_name><delimiter><backend_id>".
+  static string MakePoolTopicKey(const string& pool_name, const string& backend_id);
+
+  FRIEND_TEST(AdmissionControllerTest, Simple);
+  friend class AdmissionControllerTest;
 };
 
 } // namespace impala
diff --git a/be/src/scheduling/query-schedule.cc b/be/src/scheduling/query-schedule.cc
index 365c981..f1c6770 100644
--- a/be/src/scheduling/query-schedule.cc
+++ b/be/src/scheduling/query-schedule.cc
@@ -26,9 +26,9 @@
 #include "util/container-util.h"
 #include "util/mem-info.h"
 #include "util/network-util.h"
-#include "util/uid-util.h"
-#include "util/debug-util.h"
 #include "util/parse-util.h"
+#include "util/test-info.h"
+#include "util/uid-util.h"
 
 #include "common/names.h"
 
@@ -51,6 +51,18 @@ QuerySchedule::QuerySchedule(const TUniqueId& query_id,
   Init();
 }
 
+QuerySchedule::QuerySchedule(const TUniqueId& query_id, const TQueryExecRequest& request,
+    const TQueryOptions& query_options, RuntimeProfile* summary_profile)
+  : query_id_(query_id),
+    request_(request),
+    query_options_(query_options),
+    summary_profile_(summary_profile),
+    num_scan_ranges_(0),
+    next_instance_id_(query_id) {
+  // Init() is not called, this constructor is for white box testing only.
+  DCHECK(TestInfo::is_test());
+}
+
 void QuerySchedule::Init() {
   // extract TPlanFragments and order by fragment idx
   vector<const TPlanFragment*> fragments;
diff --git a/be/src/scheduling/query-schedule.h b/be/src/scheduling/query-schedule.h
index 6021f98..acab122 100644
--- a/be/src/scheduling/query-schedule.h
+++ b/be/src/scheduling/query-schedule.h
@@ -157,6 +157,10 @@ class QuerySchedule {
       const TQueryOptions& query_options, RuntimeProfile* summary_profile,
       RuntimeProfile::EventSequence* query_events);
 
+  /// For testing only: build a QuerySchedule object but do not run Init().
+  QuerySchedule(const TUniqueId& query_id, const TQueryExecRequest& request,
+      const TQueryOptions& query_options, RuntimeProfile* summary_profile);
+
   /// Verifies that the schedule is well-formed (and DCHECKs if it isn't):
   /// - all fragments have a FragmentExecParams
   /// - all scan ranges are assigned