You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2019/01/10 18:36:03 UTC
[mesos] 05/08: Added unit tests for Master HTTP endpoints.
This is an automated email from the ASF dual-hosted git repository.
grag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 3955b357e7b36b1b20724d920d9138be2180b8e3
Author: Benno Evers <be...@mesosphere.com>
AuthorDate: Wed Jan 9 14:29:54 2019 -0800
Added unit tests for Master HTTP endpoints.
This commit adds a set of unit test to verify that
basic master HTTP endpoints still work correctly
under the presence of request caching.
Review: https://reviews.apache.org/r/69064/
---
src/Makefile.am | 3 +-
src/tests/master_load_tests.cpp | 559 ++++++++++++++++++++++++++++++++++++++++
2 files changed, 561 insertions(+), 1 deletion(-)
diff --git a/src/Makefile.am b/src/Makefile.am
index 188a470..cd78525 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -2507,7 +2507,8 @@ mesos_tests_SOURCES = \
tests/master_allocator_tests.cpp \
tests/master_authorization_tests.cpp \
tests/master_benchmarks.cpp \
- tests/master_contender_detector_tests.cpp \
+ tests/master_contender_detector_tests.cpp \
+ tests/master_load_tests.cpp \
tests/master_maintenance_tests.cpp \
tests/master_quota_tests.cpp \
tests/master_slave_reconciliation_tests.cpp \
diff --git a/src/tests/master_load_tests.cpp b/src/tests/master_load_tests.cpp
new file mode 100644
index 0000000..4e9c8e8
--- /dev/null
+++ b/src/tests/master_load_tests.cpp
@@ -0,0 +1,559 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <mesos/mesos.hpp>
+
+#include <process/async.hpp>
+#include <process/clock.hpp>
+#include <process/future.hpp>
+#include <process/gmock.hpp>
+#include <process/http.hpp>
+#include <process/owned.hpp>
+#include <process/pid.hpp>
+
+#include "tests/mesos.hpp"
+
+using mesos::authorization::VIEW_EXECUTOR;
+using mesos::authorization::VIEW_FLAGS;
+using mesos::authorization::VIEW_FRAMEWORK;
+using mesos::authorization::VIEW_ROLE;
+using mesos::authorization::VIEW_TASK;
+
+using mesos::internal::slave::Containerizer;
+using mesos::internal::slave::Fetcher;
+using mesos::internal::slave::MesosContainerizer;
+
+using mesos::master::detector::MasterDetector;
+
+using process::async;
+using process::Clock;
+using process::delay;
+using process::Future;
+using process::Message;
+using process::Owned;
+using process::PID;
+using process::Promise;
+using process::Time;
+
+using process::http::Response;
+using process::http::Request;
+using process::http::Headers;
+
+using testing::SaveArg;
+
+
+// The tests in this file are designed to verify that the caching
+// of read-only requests inside a Mesos master is implemented correctly,
+// i.e. cacheable requests are cached and non-cacheable requests will
+// return different responses.
+
+namespace mesos {
+namespace internal {
+namespace tests {
+
+class BlockingAuthorizer;
+
+
+class MasterLoadTest : public MesosTest {
+protected:
+ // Describes a given HTTP request.
+ struct RequestDescriptor {
+ std::string endpoint;
+ std::string principal;
+ std::string query;
+ process::http::Headers headers;
+
+ bool operator<(const RequestDescriptor& other) const;
+ };
+
+ // Prepare a mock cluster with 1 master, 1 agent and 1 framework,
+ // with the given authorizer being wrapped in a `BlockingAuthorizer`.
+ void prepareCluster(Authorizer* authorizer);
+
+ // This function launches a fixed number of equivalent requests per passed
+ // request descriptor, while manipulating the master in order to
+ // ensure all requests will appear consecutively in the master queue.
+ // The returned map associates each response with the descriptor it was
+ // created from.
+ std::multimap<RequestDescriptor, Future<Response>>
+ launchSimultaneousRequests(
+ const std::vector<RequestDescriptor>& descriptors);
+
+ // The "mock cluster" created by `prepareCluster()`. These are `protected`
+ // so that the test body can access them if required.
+ Owned<BlockingAuthorizer> authorizer_;
+ Owned<cluster::Master> master_;
+ Owned<MasterDetector> detector_;
+ Owned<MockScheduler> scheduler_;
+ Owned<TestingMesosSchedulerDriver> driver_;
+ Owned<cluster::Slave> slave_;
+ FrameworkID frameworkId_;
+};
+
+
+// This authorizer will not satisfy any futures from `getObjectApprover()`
+// until it is told to, presumably from the test body.
+//
+// It effectively acts as a giant gate for certain requests.
+class BlockingAuthorizerProcess
+ : public process::Process<BlockingAuthorizerProcess>
+{
+public:
+ BlockingAuthorizerProcess(Authorizer* underlying)
+ : ProcessBase(process::ID::generate("blocking-authorizer")),
+ underlying_(underlying),
+ blocked_(true) {}
+
+ Future<bool> authorized(const authorization::Request& request)
+ {
+ return underlying_->authorized(request);
+ }
+
+ Future<Owned<ObjectApprover>> getObjectApprover(
+ const Option<authorization::Subject>& subject,
+ const authorization::Action& action)
+ {
+ Future<Owned<ObjectApprover>> future =
+ underlying_->getObjectApprover(subject, action);
+
+ if (!blocked_) {
+ return future;
+ }
+
+ // The future is linked to the returned promise in `unleash()`.
+ futures_.push(future);
+ promises_.emplace();
+ return promises_.back().future();
+ }
+
+ Future<size_t> pending()
+ {
+ return promises_.size();
+ }
+
+ // Satisfies all future and prior calls made to `getObjectApprover`.
+ Future<Nothing> unleash()
+ {
+ CHECK_EQ(promises_.size(), futures_.size());
+
+ while (!promises_.empty()) {
+ promises_.front().associate(futures_.front());
+
+ futures_.pop();
+ promises_.pop();
+ }
+
+ blocked_ = false;
+
+ return Nothing();
+ }
+
+private:
+ Authorizer* underlying_;
+ std::queue<Future<Owned<ObjectApprover>>> futures_;
+ std::queue<Promise<Owned<ObjectApprover>>> promises_;
+ bool blocked_;
+};
+
+
+class BlockingAuthorizer : public Authorizer
+{
+public:
+ BlockingAuthorizer(Authorizer* underlying)
+ : process_(new BlockingAuthorizerProcess(underlying))
+ {
+ process::spawn(process_.get());
+ }
+
+ ~BlockingAuthorizer()
+ {
+ process::terminate(process_.get());
+ process::wait(process_.get());
+ }
+
+ Future<bool> authorized(const authorization::Request& request) override
+ {
+ return process::dispatch(
+ process_.get(),
+ &BlockingAuthorizerProcess::authorized,
+ request);
+ }
+
+ Future<Owned<ObjectApprover>> getObjectApprover(
+ const Option<authorization::Subject>& subject,
+ const authorization::Action& action) override
+ {
+ return process::dispatch(
+ process_.get(),
+ &BlockingAuthorizerProcess::getObjectApprover,
+ subject,
+ action);
+ }
+
+ Future<size_t> pending()
+ {
+ return process::dispatch(
+ process_.get(),
+ &BlockingAuthorizerProcess::pending);
+ }
+
+ Future<Nothing> unleash()
+ {
+ return process::dispatch(
+ process_.get(),
+ &BlockingAuthorizerProcess::unleash);
+ }
+
+private:
+ Owned<BlockingAuthorizerProcess> process_;
+};
+
+
+void MasterLoadTest::prepareCluster(Authorizer* authorizer)
+{
+ // Start a master.
+ authorizer_.reset(new BlockingAuthorizer(authorizer));
+ master::Flags masterFlags = CreateMasterFlags();
+ Try<Owned<cluster::Master>> master = StartMaster(
+ authorizer_.get(), masterFlags);
+
+ ASSERT_SOME(master);
+ master_ = master.get();
+ detector_ = master_->createDetector();
+
+ Future<FrameworkRegisteredMessage> frameworkRegisteredMessage =
+ FUTURE_PROTOBUF(FrameworkRegisteredMessage(), _, _);
+
+ Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+ FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+ // Start a framework.
+ scheduler_.reset(new MockScheduler());
+ driver_.reset(new TestingMesosSchedulerDriver(
+ scheduler_.get(), detector_.get()));
+
+ EXPECT_CALL(*scheduler_, registered(driver_.get(), _, _))
+ .WillOnce(SaveArg<1>(&frameworkId_));
+
+ driver_->start();
+ AWAIT_READY(frameworkRegisteredMessage);
+
+ // Start an agent.
+ slave::Flags slaveFlags = CreateSlaveFlags();
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector_.get(), slaveFlags);
+
+ ASSERT_SOME(slave);
+ slave_ = slave.get();
+
+ AWAIT_READY(slaveRegisteredMessage);
+}
+
+
+std::multimap<MasterLoadTest::RequestDescriptor, Future<Response>>
+MasterLoadTest::launchSimultaneousRequests(
+ const std::vector<RequestDescriptor>& descriptors)
+{
+ // NOTE: On Mac, the default number of open files (and thus tcp connections)
+ // is limited to 256 by default, so this number is tweaked to stay slightly
+ // lower than that at 40*5==200 connections for the most demanding test.
+ const size_t REQUESTS_PER_DESCRIPTOR = 40;
+ const size_t totalRequests = REQUESTS_PER_DESCRIPTOR * descriptors.size();
+
+ std::multimap<RequestDescriptor, Future<Response>> requests;
+
+ // Need this wrapper since `AWAIT_READY()` expects a `void` return type.
+ [&] {
+ // Send out all http requests based on the specifications
+ // found in `descriptors` and store the result in `requests`.
+ foreach (const RequestDescriptor& descriptor, descriptors) {
+ for (size_t i=0; i < REQUESTS_PER_DESCRIPTOR; ++i) {
+ Future<Response> response = process::http::get(
+ master_->pid,
+ descriptor.endpoint,
+ descriptor.query,
+ descriptor.headers);
+
+ requests.emplace(descriptor, response);
+ }
+ }
+
+ // Wait until all the HTTP events have reached the master and are now
+ // awaiting authorization. There might be some other requests that get
+ // mixed into the authorizer, so we must have ample requests in the
+ // test body to ensure cache hits.
+ Time whileLoopStartTime = Clock::now();
+ Future<size_t> pendingHttpCalls;
+ do {
+ pendingHttpCalls = authorizer_->pending();
+ AWAIT_READY(pendingHttpCalls);
+ // Protect against a potential infinite loop introduced by future bugs.
+ ASSERT_TRUE(Clock::now() - whileLoopStartTime < Seconds(20));
+ } while (pendingHttpCalls.get() < totalRequests);
+
+
+ // Now block the master actor, since we don't want the master to start
+ // batching until it is queued up with all the HTTP requests.
+ // NOTE: This function might be out of scope when the dispatch is
+ // scheduled, so we need to pass `masterBlocker` by value.
+ auto masterBlocker = std::make_shared<Promise<Nothing>>();
+ process::dispatch(master_->pid, [masterBlocker]() {
+ masterBlocker->future().await();
+ });
+
+ // Unblock the BlockingAuthorizer.
+ // This should trigger all the deferrals onto the master from the
+ // Authorizer's thread. When this future completes, the master's queue
+ // should be full of batched requests.
+ AWAIT_READY(authorizer_->unleash());
+
+ // Unblock the master now, so it can perform the batching.
+ masterBlocker->set(Nothing());
+ }();
+
+ return requests;
+}
+
+
+bool MasterLoadTest::RequestDescriptor::operator<(
+ const RequestDescriptor& other) const
+{
+ return endpoint < other.endpoint;
+}
+
+
+// Test that simultaneous responses to various different endpoints
+// all return the expected result.
+TEST_F(MasterLoadTest, SimultaneousBatchedRequests)
+{
+ MockAuthorizer mockAuthorizer;
+ prepareCluster(&mockAuthorizer);
+
+ // Set up the actual test.
+ RequestDescriptor descriptor1;
+ descriptor1.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+ descriptor1.endpoint = "/state";
+
+ RequestDescriptor descriptor2 = descriptor1;
+ descriptor2.endpoint = "/state-summary";
+
+ RequestDescriptor descriptor3 = descriptor1;
+ descriptor3.endpoint = "/frameworks";
+
+ RequestDescriptor descriptor4 = descriptor1;
+ descriptor4.endpoint = "/slaves";
+
+ RequestDescriptor descriptor5 = descriptor1;
+ descriptor5.endpoint = "/roles";
+
+ auto responses = launchSimultaneousRequests(
+ {descriptor1, descriptor2, descriptor3, descriptor4, descriptor5});
+
+ foreachpair (
+ const RequestDescriptor& request,
+ Future<Response>& response,
+ responses)
+ {
+ AWAIT_READY(response);
+
+ mesos::internal::master::Master* master = master_->master.get();
+ mesos::internal::master::Master::ReadOnlyHandler readOnlyHandler(master);
+
+ // TODO(bevers): Ideally we would not use HTTP at all to generate
+ // the reference response, but some master-internal function
+ // like `model(Summary<Master>)`.
+ Try<hashmap<std::string, std::string>> queryParameters_ =
+ process::http::query::decode(request.query);
+
+ ASSERT_SOME(queryParameters_);
+ hashmap<std::string, std::string> queryParameters = queryParameters_.get();
+
+ process::http::authentication::Principal principal(request.principal);
+ MockAuthorizer authorizer;
+ Owned<ObjectApprovers> approvers = ObjectApprovers::create(
+ &authorizer,
+ principal,
+ {VIEW_ROLE, VIEW_FLAGS, VIEW_FRAMEWORK, VIEW_TASK, VIEW_EXECUTOR})
+ .get();
+
+ Response reference;
+ if (request.endpoint == "/state") {
+ reference = readOnlyHandler.state(queryParameters, approvers);
+ } else if (request.endpoint == "/state-summary") {
+ reference = readOnlyHandler.stateSummary(queryParameters, approvers);
+ } else if (request.endpoint == "/roles") {
+ reference = readOnlyHandler.roles(queryParameters, approvers);
+ } else if (request.endpoint == "/frameworks") {
+ reference = readOnlyHandler.frameworks(queryParameters, approvers);
+ } else if (request.endpoint == "/slaves") {
+ reference = readOnlyHandler.slaves(queryParameters, approvers);
+ } else {
+ UNREACHABLE();
+ }
+
+ EXPECT_EQ(reference.body, response->body);
+ }
+
+ // Ensure that we actually hit the metrics code path while executing
+ // the test.
+ JSON::Object metrics = Metrics();
+ ASSERT_TRUE(metrics.values["master/http_cache_hits"].is<JSON::Number>());
+ ASSERT_GT(
+ metrics.values["master/http_cache_hits"].as<JSON::Number>().as<size_t>(),
+ 0u);
+}
+
+
+// Test that simultaneous requests on a single endpoint for two
+// different principals return different results.
+TEST_F(MasterLoadTest, Principals)
+{
+ // Set up a proper authorizer for this test.
+ master::Flags flags = CreateMasterFlags();
+
+ {
+ // Default principal is allowed to view frameworks.
+ mesos::ACL::ViewFramework* acl = flags.acls->add_view_frameworks();
+ acl->mutable_principals()->add_values(DEFAULT_CREDENTIAL.principal());
+ acl->mutable_users()->set_type(mesos::ACL::Entity::ANY);
+ }
+
+ {
+ // Default principal 2 is not allowed to view frameworks.
+ mesos::ACL::ViewFramework* acl = flags.acls->add_view_frameworks();
+ acl->mutable_principals()->add_values(DEFAULT_CREDENTIAL_2.principal());
+ acl->mutable_users()->set_type(mesos::ACL::Entity::NONE);
+ }
+
+ Authorizer* localAuthorizer = Authorizer::create(flags.acls.get()).get();
+ prepareCluster(localAuthorizer);
+
+ // Set up the requests with correct principals.
+ RequestDescriptor descriptor1;
+ descriptor1.endpoint = "/frameworks";
+ descriptor1.principal = DEFAULT_CREDENTIAL.principal();
+ descriptor1.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+
+ RequestDescriptor descriptor2 = descriptor1;
+ descriptor2.principal = DEFAULT_CREDENTIAL_2.principal();
+ descriptor2.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL_2);
+
+ auto responses = launchSimultaneousRequests({descriptor1, descriptor2});
+
+ JSON::Value expected = JSON::parse(
+ "{"
+ "\"frameworks\": [{"
+ "\"id\": \"" + stringify(frameworkId_) + "\""
+ "}]"
+ "}"
+ ).get();
+
+ foreachpair (
+ const RequestDescriptor& request,
+ Future<Response>& response,
+ responses)
+ {
+ AWAIT_READY(response);
+
+ Try<JSON::Value> jsonResponse = JSON::parse(response->body);
+ ASSERT_SOME(jsonResponse);
+
+ if (request.principal == DEFAULT_CREDENTIAL.principal()) {
+ EXPECT_TRUE(jsonResponse->contains(expected))
+ << "Principal " << request.principal
+ << " got HTTP response: " << response->body;
+ } else {
+ EXPECT_FALSE(jsonResponse->contains(expected))
+ << "Principal " << request.principal
+ << " got HTTP response: " << response->body;
+ }
+ }
+
+ // Ensure that we actually hit the metrics code path while executing
+ // the test.
+ JSON::Object metrics = Metrics();
+ ASSERT_TRUE(metrics.values["master/http_cache_hits"].is<JSON::Number>());
+ ASSERT_GT(
+ metrics.values["master/http_cache_hits"].as<JSON::Number>().as<size_t>(),
+ 0u);
+}
+
+
+// Test that simultaneous requests on a single endpoint with
+// different query parameters produce different results.
+TEST_F(MasterLoadTest, QueryParameters)
+{
+ MockAuthorizer mockAuthorizer;
+ prepareCluster(&mockAuthorizer);
+
+ RequestDescriptor descriptor1;
+ descriptor1.endpoint = "/frameworks";
+ descriptor1.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+ descriptor1.query = "";
+
+ RequestDescriptor descriptor2 = descriptor1;
+ descriptor2.query = "framework_id=nonexisting-framework-id";
+
+ RequestDescriptor descriptor3 = descriptor1;
+ descriptor3.query = "jsonp=xxx";
+
+ auto responses = launchSimultaneousRequests(
+ {descriptor1, descriptor2, descriptor3});
+
+ JSON::Value expected = JSON::parse(
+ "{"
+ "\"frameworks\": [{"
+ "\"id\": \"" + stringify(frameworkId_) + "\""
+ "}]"
+ "}"
+ ).get();
+
+ foreachpair (
+ const RequestDescriptor& request,
+ Future<Response>& response,
+ responses)
+ {
+ AWAIT_READY(response);
+
+ if (strings::contains(request.query, "jsonp")) {
+ EXPECT_TRUE(strings::contains(response->body, "xxx"))
+ << "Got HTTP response: " << response->body;
+ continue;
+ }
+
+ Try<JSON::Value> jsonResponse = JSON::parse(response->body);
+ ASSERT_SOME(jsonResponse);
+
+ if (strings::contains(request.query, "framework_id")) {
+ EXPECT_FALSE(jsonResponse->contains(expected))
+ << "Got HTTP response: " << response->body;
+ } else {
+ EXPECT_TRUE(jsonResponse->contains(expected))
+ << "Got HTTP response: " << response->body;
+ }
+ }
+
+ // Ensure that we actually hit the metrics code path while executing
+ // the test.
+ JSON::Object metrics = Metrics();
+ ASSERT_TRUE(metrics.values["master/http_cache_hits"].is<JSON::Number>());
+ ASSERT_GT(
+ metrics.values["master/http_cache_hits"].as<JSON::Number>().as<size_t>(),
+ 0u);
+}
+
+} // namespace tests {
+} // namespace internal {
+} // namespace mesos {