You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jo...@apache.org on 2019/08/26 23:09:59 UTC

[mesos] 03/05: Added draining tests for empty agents.

This is an automated email from the ASF dual-hosted git repository.

josephwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 1e3661982eba6da71a5ca8178472ef762d9fc780
Author: Joseph Wu <jo...@apache.org>
AuthorDate: Wed Aug 7 09:02:01 2019 -0700

    Added draining tests for empty agents.
    
    This splits the existing agent draining tests into two variants:
    1) where the agent has nothing running, and
    2) where the agent has one task running.
    
    Review: https://reviews.apache.org/r/71316
---
 src/tests/master_draining_tests.cpp | 294 ++++++++++++++++++++++++++++++------
 1 file changed, 250 insertions(+), 44 deletions(-)

diff --git a/src/tests/master_draining_tests.cpp b/src/tests/master_draining_tests.cpp
index eae809f..674f5b5 100644
--- a/src/tests/master_draining_tests.cpp
+++ b/src/tests/master_draining_tests.cpp
@@ -42,6 +42,8 @@
 #include "common/protobuf_utils.hpp"
 #include "common/resources_utils.hpp"
 
+#include "master/registry_operations.hpp"
+
 #include "messages/messages.hpp"
 
 #include "tests/cluster.hpp"
@@ -69,12 +71,12 @@ namespace mesos {
 namespace internal {
 namespace tests {
 
-class MasterDrainingTest
+class MasterAlreadyDrainedTest
   : public MesosTest,
     public WithParamInterface<ContentType>
 {
 public:
-  // Creates a master, agent, framework, and launches one sleep task.
+  // Creates a master and agent.
   void SetUp() override
   {
     MesosTest::SetUp();
@@ -99,6 +101,251 @@ public:
 
     Clock::advance(agentFlags.registration_backoff_factor);
     AWAIT_READY(slaveRegisteredMessage);
+    agentId = evolve(slaveRegisteredMessage->slave_id());
+  }
+
+  void TearDown() override
+  {
+    slave.reset();
+    detector.reset();
+    master.reset();
+
+    Clock::resume();
+
+    MesosTest::TearDown();
+  }
+
+  master::Flags CreateMasterFlags() override
+  {
+    // Turn off periodic allocations to avoid the race between
+    // `HierarchicalAllocator::updateAvailable()` and periodic allocations.
+    master::Flags flags = MesosTest::CreateMasterFlags();
+    flags.allocation_interval = Seconds(1000);
+    return flags;
+  }
+
+  // Helper function to post a request to "/api/v1" master endpoint and return
+  // the response.
+  Future<http::Response> post(
+      const process::PID<master::Master>& pid,
+      const v1::master::Call& call,
+      const ContentType& contentType,
+      const Credential& credential = DEFAULT_CREDENTIAL)
+  {
+    http::Headers headers = createBasicAuthHeaders(credential);
+    headers["Accept"] = stringify(contentType);
+
+    return http::post(
+        pid,
+        "api/v1",
+        headers,
+        serialize(contentType, call),
+        stringify(contentType));
+  }
+
+protected:
+  master::Flags masterFlags;
+  Owned<cluster::Master> master;
+  Owned<MasterDetector> detector;
+
+  slave::Flags agentFlags;
+  Owned<cluster::Slave> slave;
+  v1::AgentID agentId;
+};
+
+
+// These tests are parameterized by the content type of the HTTP request.
+INSTANTIATE_TEST_CASE_P(
+    ContentType,
+    MasterAlreadyDrainedTest,
+    ::testing::Values(ContentType::PROTOBUF, ContentType::JSON));
+
+
+// When an operator submits a DRAIN_AGENT call, the agent with nothing running
+// should be immediately transitioned to the DRAINED state.
+TEST_P(MasterAlreadyDrainedTest, DrainAgent)
+{
+  Future<Nothing> registrarApplyDrained;
+  EXPECT_CALL(*master->registrar, apply(_))
+    .WillOnce(DoDefault())
+    .WillOnce(DoAll(
+        FutureSatisfy(&registrarApplyDrained),
+        Invoke(master->registrar.get(), &MockRegistrar::unmocked_apply)));
+
+  ContentType contentType = GetParam();
+
+  {
+    v1::master::Call::DrainAgent drainAgent;
+    drainAgent.mutable_agent_id()->CopyFrom(agentId);
+    drainAgent.mutable_max_grace_period()->set_seconds(10);
+
+    v1::master::Call call;
+    call.set_type(v1::master::Call::DRAIN_AGENT);
+    call.mutable_drain_agent()->CopyFrom(drainAgent);
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(
+        http::OK().status,
+        post(master->pid, call, contentType));
+  }
+
+  AWAIT_READY(registrarApplyDrained);
+
+  mesos::v1::DrainInfo drainInfo;
+  drainInfo.set_state(mesos::v1::DRAINED);
+  drainInfo.mutable_config()->set_mark_gone(false);
+  drainInfo.mutable_config()->mutable_max_grace_period()
+    ->set_nanoseconds(Seconds(10).ns());
+
+  // Ensure that the agent's drain info is reflected in the master's
+  // GET_AGENTS response.
+  {
+    v1::master::Call call;
+    call.set_type(v1::master::Call::GET_AGENTS);
+
+    Future<http::Response> response =
+      post(master->pid, call, contentType);
+    AWAIT_ASSERT_RESPONSE_STATUS_EQ(http::OK().status, response);
+
+    Try<v1::master::Response> getAgents =
+      deserialize<v1::master::Response>(contentType, response->body);
+    ASSERT_SOME(getAgents);
+
+    ASSERT_EQ(v1::master::Response::GET_AGENTS, getAgents->type());
+    ASSERT_EQ(getAgents->get_agents().agents_size(), 1);
+
+    const v1::master::Response::GetAgents::Agent& agent =
+        getAgents->get_agents().agents(0);
+
+    EXPECT_EQ(agent.deactivated(), true);
+
+    EXPECT_EQ(agent.drain_info(), drainInfo);
+    EXPECT_LT(0, agent.estimated_drain_start_time().nanoseconds());
+  }
+}
+
+
+// When an operator submits a DRAIN_AGENT call with 'mark_gone == true',
+// and the agent is not running anything, the agent should immediately be
+// marked gone.
+TEST_P(MasterAlreadyDrainedTest, DrainAgentMarkGone)
+{
+  // When the terminal ACK is received by the master, the agent should be marked
+  // gone, which entails sending a `ShutdownMessage`.
+  Future<ShutdownMessage> shutdownMessage =
+    FUTURE_PROTOBUF(ShutdownMessage(), _, _);
+
+  ContentType contentType = GetParam();
+
+  {
+    v1::master::Call::DrainAgent drainAgent;
+    drainAgent.mutable_agent_id()->CopyFrom(agentId);
+    drainAgent.set_mark_gone(true);
+
+    v1::master::Call call;
+    call.set_type(v1::master::Call::DRAIN_AGENT);
+    call.mutable_drain_agent()->CopyFrom(drainAgent);
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(
+        http::OK().status,
+        post(master->pid, call, contentType));
+  }
+
+  AWAIT_READY(shutdownMessage);
+}
+
+
+// When an operator submits a DRAIN_AGENT call for an agent that has gone
+// unreachable, the call should succeed, and the agent should be drained
+// if/when it returns to the cluster.
+TEST_P(MasterAlreadyDrainedTest, DrainAgentUnreachable)
+{
+  Future<Owned<master::RegistryOperation>> registrarApplyUnreachable;
+  EXPECT_CALL(*master->registrar, apply(_))
+    .WillOnce(DoAll(
+        FutureArg<0>(&registrarApplyUnreachable),
+        Invoke(master->registrar.get(), &MockRegistrar::unmocked_apply)))
+    .WillRepeatedly(DoDefault());
+
+  // Simulate an agent crash, so that it disconnects from the master.
+  slave->terminate();
+  slave.reset();
+
+  Clock::advance(masterFlags.agent_reregister_timeout);
+  AWAIT_READY(registrarApplyUnreachable);
+  ASSERT_NE(
+      nullptr,
+      dynamic_cast<master::MarkSlaveUnreachable*>(
+          registrarApplyUnreachable->get()));
+
+  // Start draining the unreachable agent.
+  ContentType contentType = GetParam();
+
+  {
+    v1::master::Call::DrainAgent drainAgent;
+    drainAgent.mutable_agent_id()->CopyFrom(agentId);
+
+    v1::master::Call call;
+    call.set_type(v1::master::Call::DRAIN_AGENT);
+    call.mutable_drain_agent()->CopyFrom(drainAgent);
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(
+        http::OK().status,
+        post(master->pid, call, contentType));
+  }
+
+  // Bring the agent back.
+  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+    FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+  Future<DrainSlaveMessage> drainSlaveMesage =
+    FUTURE_PROTOBUF(DrainSlaveMessage(), _, _);
+
+  Try<Owned<cluster::Slave>> recoveredSlave =
+    StartSlave(detector.get(), agentFlags);
+  ASSERT_SOME(recoveredSlave);
+
+  Clock::advance(agentFlags.executor_reregistration_timeout);
+  Clock::settle();
+  Clock::advance(agentFlags.registration_backoff_factor);
+  Clock::settle();
+  AWAIT_READY(slaveReregisteredMessage);
+
+  // The agent should be told to drain once it reregisters.
+  AWAIT_READY(drainSlaveMesage);
+
+  // Ensure that the agent is marked as DRAINED in the master now.
+  {
+    v1::master::Call call;
+    call.set_type(v1::master::Call::GET_AGENTS);
+
+    Future<http::Response> response =
+      post(master->pid, call, contentType);
+    AWAIT_ASSERT_RESPONSE_STATUS_EQ(http::OK().status, response);
+
+    Try<v1::master::Response> getAgents =
+      deserialize<v1::master::Response>(contentType, response->body);
+    ASSERT_SOME(getAgents);
+
+    ASSERT_EQ(v1::master::Response::GET_AGENTS, getAgents->type());
+    ASSERT_EQ(getAgents->get_agents().agents_size(), 1);
+
+    const v1::master::Response::GetAgents::Agent& agent =
+        getAgents->get_agents().agents(0);
+
+    EXPECT_EQ(agent.deactivated(), true);
+    EXPECT_EQ(mesos::v1::DRAINED, agent.drain_info().state());
+  }
+}
+
+
+class MasterDrainingTest
+  : public MasterAlreadyDrainedTest
+{
+public:
+  // Creates a master, agent, framework, and launches one sleep task.
+  void SetUp() override
+  {
+    MasterAlreadyDrainedTest::SetUp();
 
     // Create the framework.
     scheduler = std::make_shared<v1::MockHTTPScheduler>();
@@ -134,7 +381,6 @@ public:
     ASSERT_FALSE(offers->offers().empty());
 
     const v1::Offer& offer = offers->offers(0);
-    agentId = offer.agent_id();
 
     Try<v1::Resources> resources =
       v1::Resources::parse("cpus:0.1;mem:64;disk:64");
@@ -189,51 +435,11 @@ public:
   {
     mesos.reset();
     scheduler.reset();
-    slave.reset();
-    detector.reset();
-    master.reset();
-
-    Clock::resume();
-
-    MesosTest::TearDown();
-  }
-
-  master::Flags CreateMasterFlags() override
-  {
-    // Turn off periodic allocations to avoid the race between
-    // `HierarchicalAllocator::updateAvailable()` and periodic allocations.
-    master::Flags flags = MesosTest::CreateMasterFlags();
-    flags.allocation_interval = Seconds(1000);
-    return flags;
-  }
-
-  // Helper function to post a request to "/api/v1" master endpoint and return
-  // the response.
-  Future<http::Response> post(
-      const process::PID<master::Master>& pid,
-      const v1::master::Call& call,
-      const ContentType& contentType)
-  {
-    http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
-    headers["Accept"] = stringify(contentType);
 
-    return http::post(
-        pid,
-        "api/v1",
-        headers,
-        serialize(contentType, call),
-        stringify(contentType));
+    MasterAlreadyDrainedTest::TearDown();
   }
 
 protected:
-  master::Flags masterFlags;
-  Owned<cluster::Master> master;
-  Owned<MasterDetector> detector;
-
-  slave::Flags agentFlags;
-  Owned<cluster::Slave> slave;
-  v1::AgentID agentId;
-
   std::shared_ptr<v1::MockHTTPScheduler> scheduler;
   v1::FrameworkInfo frameworkInfo;
   std::shared_ptr<v1::scheduler::TestMesos> mesos;