You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2019/01/10 18:36:03 UTC

[mesos] 05/08: Added unit tests for Master HTTP endpoints.

This is an automated email from the ASF dual-hosted git repository.

grag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 3955b357e7b36b1b20724d920d9138be2180b8e3
Author: Benno Evers <be...@mesosphere.com>
AuthorDate: Wed Jan 9 14:29:54 2019 -0800

    Added unit tests for Master HTTP endpoints.
    
    This commit adds a set of unit test to verify that
    basic master HTTP endpoints still work correctly
    under the presence of request caching.
    
    Review: https://reviews.apache.org/r/69064/
---
 src/Makefile.am                 |   3 +-
 src/tests/master_load_tests.cpp | 559 ++++++++++++++++++++++++++++++++++++++++
 2 files changed, 561 insertions(+), 1 deletion(-)

diff --git a/src/Makefile.am b/src/Makefile.am
index 188a470..cd78525 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -2507,7 +2507,8 @@ mesos_tests_SOURCES =						\
   tests/master_allocator_tests.cpp				\
   tests/master_authorization_tests.cpp				\
   tests/master_benchmarks.cpp					\
-  tests/master_contender_detector_tests.cpp			\
+  tests/master_contender_detector_tests.cpp     \
+  tests/master_load_tests.cpp					\
   tests/master_maintenance_tests.cpp				\
   tests/master_quota_tests.cpp					\
   tests/master_slave_reconciliation_tests.cpp			\
diff --git a/src/tests/master_load_tests.cpp b/src/tests/master_load_tests.cpp
new file mode 100644
index 0000000..4e9c8e8
--- /dev/null
+++ b/src/tests/master_load_tests.cpp
@@ -0,0 +1,559 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <mesos/mesos.hpp>
+
+#include <process/async.hpp>
+#include <process/clock.hpp>
+#include <process/future.hpp>
+#include <process/gmock.hpp>
+#include <process/http.hpp>
+#include <process/owned.hpp>
+#include <process/pid.hpp>
+
+#include "tests/mesos.hpp"
+
+using mesos::authorization::VIEW_EXECUTOR;
+using mesos::authorization::VIEW_FLAGS;
+using mesos::authorization::VIEW_FRAMEWORK;
+using mesos::authorization::VIEW_ROLE;
+using mesos::authorization::VIEW_TASK;
+
+using mesos::internal::slave::Containerizer;
+using mesos::internal::slave::Fetcher;
+using mesos::internal::slave::MesosContainerizer;
+
+using mesos::master::detector::MasterDetector;
+
+using process::async;
+using process::Clock;
+using process::delay;
+using process::Future;
+using process::Message;
+using process::Owned;
+using process::PID;
+using process::Promise;
+using process::Time;
+
+using process::http::Response;
+using process::http::Request;
+using process::http::Headers;
+
+using testing::SaveArg;
+
+
+// The tests in this file are designed to verify that the caching
+// of read-only requests inside a Mesos master is implemented correctly,
+// i.e. cacheable requests are cached and non-cacheable requests will
+// return different responses.
+
+namespace mesos {
+namespace internal {
+namespace tests {
+
+class BlockingAuthorizer;
+
+
+class MasterLoadTest : public MesosTest {
+protected:
+  // Describes a given HTTP request.
+  struct RequestDescriptor {
+    std::string endpoint;
+    std::string principal;
+    std::string query;
+    process::http::Headers headers;
+
+    bool operator<(const RequestDescriptor& other) const;
+  };
+
+  // Prepare a mock cluster with 1 master, 1 agent and 1 framework,
+  // with the given authorizer being wrapped in a `BlockingAuthorizer`.
+  void prepareCluster(Authorizer* authorizer);
+
+  // This function launches a fixed number of equivalent requests per passed
+  // request descriptor, while manipulating the master in order to
+  // ensure all requests will appear consecutively in the master queue.
+  // The returned map associates each response with the descriptor it was
+  // created from.
+  std::multimap<RequestDescriptor, Future<Response>>
+  launchSimultaneousRequests(
+      const std::vector<RequestDescriptor>& descriptors);
+
+  // The "mock cluster" created by `prepareCluster()`. These are `protected`
+  // so that the test body can access them if required.
+  Owned<BlockingAuthorizer> authorizer_;
+  Owned<cluster::Master> master_;
+  Owned<MasterDetector> detector_;
+  Owned<MockScheduler> scheduler_;
+  Owned<TestingMesosSchedulerDriver> driver_;
+  Owned<cluster::Slave> slave_;
+  FrameworkID frameworkId_;
+};
+
+
+// This authorizer will not satisfy any futures from `getObjectApprover()`
+// until it is told to, presumably from the test body.
+//
+// It effectively acts as a giant gate for certain requests.
+class BlockingAuthorizerProcess
+  : public process::Process<BlockingAuthorizerProcess>
+{
+public:
+  BlockingAuthorizerProcess(Authorizer* underlying)
+    : ProcessBase(process::ID::generate("blocking-authorizer")),
+      underlying_(underlying),
+      blocked_(true) {}
+
+  Future<bool> authorized(const authorization::Request& request)
+  {
+    return underlying_->authorized(request);
+  }
+
+  Future<Owned<ObjectApprover>> getObjectApprover(
+      const Option<authorization::Subject>& subject,
+      const authorization::Action& action)
+  {
+    Future<Owned<ObjectApprover>> future =
+      underlying_->getObjectApprover(subject, action);
+
+    if (!blocked_) {
+      return future;
+    }
+
+    // The future is linked to the returned promise in `unleash()`.
+    futures_.push(future);
+    promises_.emplace();
+    return promises_.back().future();
+  }
+
+  Future<size_t> pending()
+  {
+    return promises_.size();
+  }
+
+  // Satisfies all future and prior calls made to `getObjectApprover`.
+  Future<Nothing> unleash()
+  {
+    CHECK_EQ(promises_.size(), futures_.size());
+
+    while (!promises_.empty()) {
+      promises_.front().associate(futures_.front());
+
+      futures_.pop();
+      promises_.pop();
+    }
+
+    blocked_ = false;
+
+    return Nothing();
+  }
+
+private:
+  Authorizer* underlying_;
+  std::queue<Future<Owned<ObjectApprover>>> futures_;
+  std::queue<Promise<Owned<ObjectApprover>>> promises_;
+  bool blocked_;
+};
+
+
+class BlockingAuthorizer : public Authorizer
+{
+public:
+  BlockingAuthorizer(Authorizer* underlying)
+    : process_(new BlockingAuthorizerProcess(underlying))
+  {
+    process::spawn(process_.get());
+  }
+
+  ~BlockingAuthorizer()
+  {
+    process::terminate(process_.get());
+    process::wait(process_.get());
+  }
+
+  Future<bool> authorized(const authorization::Request& request) override
+  {
+    return process::dispatch(
+        process_.get(),
+        &BlockingAuthorizerProcess::authorized,
+        request);
+  }
+
+  Future<Owned<ObjectApprover>> getObjectApprover(
+      const Option<authorization::Subject>& subject,
+      const authorization::Action& action) override
+  {
+    return process::dispatch(
+        process_.get(),
+        &BlockingAuthorizerProcess::getObjectApprover,
+        subject,
+        action);
+  }
+
+  Future<size_t> pending()
+  {
+    return process::dispatch(
+        process_.get(),
+        &BlockingAuthorizerProcess::pending);
+  }
+
+  Future<Nothing> unleash()
+  {
+    return process::dispatch(
+        process_.get(),
+        &BlockingAuthorizerProcess::unleash);
+  }
+
+private:
+  Owned<BlockingAuthorizerProcess> process_;
+};
+
+
+void MasterLoadTest::prepareCluster(Authorizer* authorizer)
+{
+  // Start a master.
+  authorizer_.reset(new BlockingAuthorizer(authorizer));
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(
+      authorizer_.get(), masterFlags);
+
+  ASSERT_SOME(master);
+  master_ = master.get();
+  detector_ = master_->createDetector();
+
+  Future<FrameworkRegisteredMessage> frameworkRegisteredMessage =
+    FUTURE_PROTOBUF(FrameworkRegisteredMessage(), _, _);
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  // Start a framework.
+  scheduler_.reset(new MockScheduler());
+  driver_.reset(new TestingMesosSchedulerDriver(
+      scheduler_.get(), detector_.get()));
+
+  EXPECT_CALL(*scheduler_, registered(driver_.get(), _, _))
+    .WillOnce(SaveArg<1>(&frameworkId_));
+
+  driver_->start();
+  AWAIT_READY(frameworkRegisteredMessage);
+
+  // Start an agent.
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector_.get(), slaveFlags);
+
+  ASSERT_SOME(slave);
+  slave_ = slave.get();
+
+  AWAIT_READY(slaveRegisteredMessage);
+}
+
+
+std::multimap<MasterLoadTest::RequestDescriptor, Future<Response>>
+MasterLoadTest::launchSimultaneousRequests(
+    const std::vector<RequestDescriptor>& descriptors)
+{
+  // NOTE: On Mac, the default number of open files (and thus tcp connections)
+  // is limited to 256 by default, so this number is tweaked to stay slightly
+  // lower than that at 40*5==200 connections for the most demanding test.
+  const size_t REQUESTS_PER_DESCRIPTOR = 40;
+  const size_t totalRequests = REQUESTS_PER_DESCRIPTOR * descriptors.size();
+
+  std::multimap<RequestDescriptor, Future<Response>> requests;
+
+  // Need this wrapper since `AWAIT_READY()` expects a `void` return type.
+  [&] {
+    // Send out all http requests based on the specifications
+    // found in `descriptors` and store the result in `requests`.
+    foreach (const RequestDescriptor& descriptor, descriptors) {
+      for (size_t i=0; i < REQUESTS_PER_DESCRIPTOR; ++i) {
+        Future<Response> response = process::http::get(
+            master_->pid,
+            descriptor.endpoint,
+            descriptor.query,
+            descriptor.headers);
+
+        requests.emplace(descriptor, response);
+      }
+    }
+
+    // Wait until all the HTTP events have reached the master and are now
+    // awaiting authorization.  There might be some other requests that get
+    // mixed into the authorizer, so we must have ample requests in the
+    // test body to ensure cache hits.
+    Time whileLoopStartTime = Clock::now();
+    Future<size_t> pendingHttpCalls;
+    do {
+      pendingHttpCalls = authorizer_->pending();
+      AWAIT_READY(pendingHttpCalls);
+      // Protect against a potential infinite loop introduced by future bugs.
+      ASSERT_TRUE(Clock::now() - whileLoopStartTime < Seconds(20));
+    } while (pendingHttpCalls.get() < totalRequests);
+
+
+    // Now block the master actor, since we don't want the master to start
+    // batching until it is queued up with all the HTTP requests.
+    // NOTE: This function might be out of scope when the dispatch is
+    // scheduled, so we need to pass `masterBlocker` by value.
+    auto masterBlocker = std::make_shared<Promise<Nothing>>();
+    process::dispatch(master_->pid, [masterBlocker]() {
+      masterBlocker->future().await();
+    });
+
+    // Unblock the BlockingAuthorizer.
+    // This should trigger all the deferrals onto the master from the
+    // Authorizer's thread. When this future completes, the master's queue
+    // should be full of batched requests.
+    AWAIT_READY(authorizer_->unleash());
+
+    // Unblock the master now, so it can perform the batching.
+    masterBlocker->set(Nothing());
+  }();
+
+  return requests;
+}
+
+
+bool MasterLoadTest::RequestDescriptor::operator<(
+    const RequestDescriptor& other) const
+{
+  return endpoint < other.endpoint;
+}
+
+
+// Test that simultaneous responses to various different endpoints
+// all return the expected result.
+TEST_F(MasterLoadTest, SimultaneousBatchedRequests)
+{
+  MockAuthorizer mockAuthorizer;
+  prepareCluster(&mockAuthorizer);
+
+  // Set up the actual test.
+  RequestDescriptor descriptor1;
+  descriptor1.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+  descriptor1.endpoint = "/state";
+
+  RequestDescriptor descriptor2 = descriptor1;
+  descriptor2.endpoint = "/state-summary";
+
+  RequestDescriptor descriptor3 = descriptor1;
+  descriptor3.endpoint = "/frameworks";
+
+  RequestDescriptor descriptor4 = descriptor1;
+  descriptor4.endpoint = "/slaves";
+
+  RequestDescriptor descriptor5 = descriptor1;
+  descriptor5.endpoint = "/roles";
+
+  auto responses = launchSimultaneousRequests(
+      {descriptor1, descriptor2, descriptor3, descriptor4, descriptor5});
+
+  foreachpair (
+      const RequestDescriptor& request,
+      Future<Response>& response,
+      responses)
+  {
+    AWAIT_READY(response);
+
+    mesos::internal::master::Master* master = master_->master.get();
+    mesos::internal::master::Master::ReadOnlyHandler readOnlyHandler(master);
+
+    // TODO(bevers): Ideally we would not use HTTP at all to generate
+    // the reference response, but some master-internal function
+    // like `model(Summary<Master>)`.
+    Try<hashmap<std::string, std::string>> queryParameters_ =
+      process::http::query::decode(request.query);
+
+    ASSERT_SOME(queryParameters_);
+    hashmap<std::string, std::string> queryParameters = queryParameters_.get();
+
+    process::http::authentication::Principal principal(request.principal);
+    MockAuthorizer authorizer;
+    Owned<ObjectApprovers> approvers = ObjectApprovers::create(
+        &authorizer,
+        principal,
+        {VIEW_ROLE, VIEW_FLAGS, VIEW_FRAMEWORK, VIEW_TASK, VIEW_EXECUTOR})
+      .get();
+
+    Response reference;
+    if (request.endpoint == "/state") {
+      reference = readOnlyHandler.state(queryParameters, approvers);
+    } else if (request.endpoint == "/state-summary") {
+      reference = readOnlyHandler.stateSummary(queryParameters, approvers);
+    } else if (request.endpoint == "/roles") {
+      reference = readOnlyHandler.roles(queryParameters, approvers);
+    } else if (request.endpoint == "/frameworks") {
+      reference = readOnlyHandler.frameworks(queryParameters, approvers);
+    } else if (request.endpoint == "/slaves") {
+      reference = readOnlyHandler.slaves(queryParameters, approvers);
+    } else {
+      UNREACHABLE();
+    }
+
+    EXPECT_EQ(reference.body, response->body);
+  }
+
+  // Ensure that we actually hit the metrics code path while executing
+  // the test.
+  JSON::Object metrics = Metrics();
+  ASSERT_TRUE(metrics.values["master/http_cache_hits"].is<JSON::Number>());
+  ASSERT_GT(
+      metrics.values["master/http_cache_hits"].as<JSON::Number>().as<size_t>(),
+      0u);
+}
+
+
+// Test that simultaneous requests on a single endpoint for two
+// different principals return different results.
+TEST_F(MasterLoadTest, Principals)
+{
+  // Set up a proper authorizer for this test.
+  master::Flags flags = CreateMasterFlags();
+
+  {
+    // Default principal is allowed to view frameworks.
+    mesos::ACL::ViewFramework* acl = flags.acls->add_view_frameworks();
+    acl->mutable_principals()->add_values(DEFAULT_CREDENTIAL.principal());
+    acl->mutable_users()->set_type(mesos::ACL::Entity::ANY);
+  }
+
+  {
+    // Default principal 2 is not allowed to view frameworks.
+    mesos::ACL::ViewFramework* acl = flags.acls->add_view_frameworks();
+    acl->mutable_principals()->add_values(DEFAULT_CREDENTIAL_2.principal());
+    acl->mutable_users()->set_type(mesos::ACL::Entity::NONE);
+  }
+
+  Authorizer* localAuthorizer = Authorizer::create(flags.acls.get()).get();
+  prepareCluster(localAuthorizer);
+
+  // Set up the requests with correct principals.
+  RequestDescriptor descriptor1;
+  descriptor1.endpoint = "/frameworks";
+  descriptor1.principal = DEFAULT_CREDENTIAL.principal();
+  descriptor1.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+
+  RequestDescriptor descriptor2 = descriptor1;
+  descriptor2.principal = DEFAULT_CREDENTIAL_2.principal();
+  descriptor2.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL_2);
+
+  auto responses = launchSimultaneousRequests({descriptor1, descriptor2});
+
+  JSON::Value expected = JSON::parse(
+      "{"
+        "\"frameworks\": [{"
+            "\"id\": \""  + stringify(frameworkId_) + "\""
+        "}]"
+      "}"
+  ).get();
+
+  foreachpair (
+      const RequestDescriptor& request,
+      Future<Response>& response,
+      responses)
+  {
+    AWAIT_READY(response);
+
+    Try<JSON::Value> jsonResponse = JSON::parse(response->body);
+    ASSERT_SOME(jsonResponse);
+
+    if (request.principal == DEFAULT_CREDENTIAL.principal()) {
+      EXPECT_TRUE(jsonResponse->contains(expected))
+        << "Principal " << request.principal
+        << " got HTTP response: " << response->body;
+    } else {
+      EXPECT_FALSE(jsonResponse->contains(expected))
+        << "Principal " << request.principal
+        << " got HTTP response: " << response->body;
+    }
+  }
+
+  // Ensure that we actually hit the metrics code path while executing
+  // the test.
+  JSON::Object metrics = Metrics();
+  ASSERT_TRUE(metrics.values["master/http_cache_hits"].is<JSON::Number>());
+  ASSERT_GT(
+      metrics.values["master/http_cache_hits"].as<JSON::Number>().as<size_t>(),
+      0u);
+}
+
+
+// Test that simultaneous requests on a single endpoint with
+// different query parameters produce different results.
+TEST_F(MasterLoadTest, QueryParameters)
+{
+  MockAuthorizer mockAuthorizer;
+  prepareCluster(&mockAuthorizer);
+
+  RequestDescriptor descriptor1;
+  descriptor1.endpoint = "/frameworks";
+  descriptor1.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+  descriptor1.query = "";
+
+  RequestDescriptor descriptor2 = descriptor1;
+  descriptor2.query = "framework_id=nonexisting-framework-id";
+
+  RequestDescriptor descriptor3 = descriptor1;
+  descriptor3.query = "jsonp=xxx";
+
+  auto responses = launchSimultaneousRequests(
+      {descriptor1, descriptor2, descriptor3});
+
+  JSON::Value expected = JSON::parse(
+      "{"
+        "\"frameworks\": [{"
+            "\"id\": \""  + stringify(frameworkId_) + "\""
+        "}]"
+      "}"
+  ).get();
+
+  foreachpair (
+      const RequestDescriptor& request,
+      Future<Response>& response,
+      responses)
+  {
+    AWAIT_READY(response);
+
+    if (strings::contains(request.query, "jsonp")) {
+      EXPECT_TRUE(strings::contains(response->body, "xxx"))
+        << "Got HTTP response: " << response->body;
+      continue;
+    }
+
+    Try<JSON::Value> jsonResponse = JSON::parse(response->body);
+    ASSERT_SOME(jsonResponse);
+
+    if (strings::contains(request.query, "framework_id")) {
+      EXPECT_FALSE(jsonResponse->contains(expected))
+        << "Got HTTP response: " << response->body;
+    } else {
+      EXPECT_TRUE(jsonResponse->contains(expected))
+        << "Got HTTP response: " << response->body;
+    }
+  }
+
+  // Ensure that we actually hit the metrics code path while executing
+  // the test.
+  JSON::Object metrics = Metrics();
+  ASSERT_TRUE(metrics.values["master/http_cache_hits"].is<JSON::Number>());
+  ASSERT_GT(
+      metrics.values["master/http_cache_hits"].as<JSON::Number>().as<size_t>(),
+      0u);
+}
+
+} // namespace tests {
+} // namespace internal {
+} // namespace mesos {