You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jo...@apache.org on 2019/08/26 23:09:56 UTC

[mesos] branch master updated (c104977 -> 50dcd56)

This is an automated email from the ASF dual-hosted git repository.

josephwu pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git.


    from c104977  Updated site's middleman versions.
     new 5124b29  Moved master-side agent draining tests into a separate file.
     new 4f07839  Refactored master draining test setup.
     new 1e36619  Added draining tests for empty agents.
     new 5c57128  Added draining test for momentarily disconnected agents.
     new 50dcd56  Added agent reactivations to the existing agent draining tests.

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/Makefile.am                     |    3 +-
 src/tests/CMakeLists.txt            |    1 +
 src/tests/api_tests.cpp             |  541 -------------------
 src/tests/master_draining_tests.cpp | 1018 +++++++++++++++++++++++++++++++++++
 4 files changed, 1021 insertions(+), 542 deletions(-)
 create mode 100644 src/tests/master_draining_tests.cpp


[mesos] 02/05: Refactored master draining test setup.

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

josephwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 4f078398f7010d982a1c4ee95a1e3f628813e6fe
Author: Joseph Wu <jo...@apache.org>
AuthorDate: Mon Jul 29 19:43:31 2019 -0700

    Refactored master draining test setup.
    
    Tests of this feature will generally require a master, agent, framework,
    and a single task to be launched at the beginning of the test.
    This moves this common code into the test SetUp.
    
    This also changes the `post(...)` helper to return the http::Response
    object instead of parsing it.  The response for DRAIN_AGENT calls
    does not return an object, so the tests were not checking the
    response before.
    
    Review: https://reviews.apache.org/r/71315
---
 src/tests/master_draining_tests.cpp | 494 +++++++++++++-----------------------
 1 file changed, 175 insertions(+), 319 deletions(-)

diff --git a/src/tests/master_draining_tests.cpp b/src/tests/master_draining_tests.cpp
index 16d0c85..eae809f 100644
--- a/src/tests/master_draining_tests.cpp
+++ b/src/tests/master_draining_tests.cpp
@@ -14,6 +14,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+#include <memory>
 #include <string>
 
 #include <mesos/http.hpp>
@@ -73,6 +74,130 @@ class MasterDrainingTest
     public WithParamInterface<ContentType>
 {
 public:
+  // Creates a master, agent, framework, and launches one sleep task.
+  void SetUp() override
+  {
+    MesosTest::SetUp();
+
+    Clock::pause();
+
+    // Create the master.
+    masterFlags = CreateMasterFlags();
+    Try<Owned<cluster::Master>> _master = StartMaster(masterFlags);
+    ASSERT_SOME(_master);
+    master = _master.get();
+
+    Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+      FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+    // Create the agent.
+    agentFlags = CreateSlaveFlags();
+    detector = master.get()->createDetector();
+    Try<Owned<cluster::Slave>> _slave = StartSlave(detector.get(), agentFlags);
+    ASSERT_SOME(_slave);
+    slave = _slave.get();
+
+    Clock::advance(agentFlags.registration_backoff_factor);
+    AWAIT_READY(slaveRegisteredMessage);
+
+    // Create the framework.
+    scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+    frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+    frameworkInfo.set_checkpoint(true);
+    frameworkInfo.add_capabilities()->set_type(
+        v1::FrameworkInfo::Capability::PARTITION_AWARE);
+
+    EXPECT_CALL(*scheduler, connected(_))
+      .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
+
+    Future<v1::scheduler::Event::Subscribed> subscribed;
+    EXPECT_CALL(*scheduler, subscribed(_, _))
+      .WillOnce(FutureArg<1>(&subscribed));
+
+    EXPECT_CALL(*scheduler, heartbeat(_))
+      .WillRepeatedly(Return()); // Ignore heartbeats.
+
+    Future<v1::scheduler::Event::Offers> offers;
+    EXPECT_CALL(*scheduler, offers(_, _))
+      .WillOnce(FutureArg<1>(&offers))
+      .WillRepeatedly(Return());
+
+    mesos = std::make_shared<v1::scheduler::TestMesos>(
+        master.get()->pid, ContentType::PROTOBUF, scheduler);
+
+    AWAIT_READY(subscribed);
+    frameworkId = subscribed->framework_id();
+
+    // Launch a sleep task.
+    AWAIT_READY(offers);
+    ASSERT_FALSE(offers->offers().empty());
+
+    const v1::Offer& offer = offers->offers(0);
+    agentId = offer.agent_id();
+
+    Try<v1::Resources> resources =
+      v1::Resources::parse("cpus:0.1;mem:64;disk:64");
+
+    ASSERT_SOME(resources);
+
+    taskInfo = v1::createTask(agentId, resources.get(), SLEEP_COMMAND(1000));
+
+    testing::Sequence updateSequence;
+    Future<v1::scheduler::Event::Update> startingUpdate;
+    Future<v1::scheduler::Event::Update> runningUpdate;
+
+    // Make sure the agent receives these two acknowledgements.
+    Future<StatusUpdateAcknowledgementMessage> startingAck =
+      FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _);
+    Future<StatusUpdateAcknowledgementMessage> runningAck =
+      FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _);
+
+    EXPECT_CALL(
+        *scheduler,
+        update(_, AllOf(
+            TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+            TaskStatusUpdateStateEq(v1::TASK_STARTING))))
+      .InSequence(updateSequence)
+      .WillOnce(DoAll(
+          FutureArg<1>(&startingUpdate),
+          v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+    EXPECT_CALL(
+        *scheduler,
+        update(_, AllOf(
+              TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+              TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
+      .InSequence(updateSequence)
+      .WillOnce(DoAll(
+          FutureArg<1>(&runningUpdate),
+          v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+    mesos->send(
+        v1::createCallAccept(
+            frameworkId,
+            offer,
+            {v1::LAUNCH({taskInfo})}));
+
+    AWAIT_READY(startingUpdate);
+    AWAIT_READY(startingAck);
+    AWAIT_READY(runningUpdate);
+    AWAIT_READY(runningAck);
+  }
+
+  void TearDown() override
+  {
+    mesos.reset();
+    scheduler.reset();
+    slave.reset();
+    detector.reset();
+    master.reset();
+
+    Clock::resume();
+
+    MesosTest::TearDown();
+  }
+
   master::Flags CreateMasterFlags() override
   {
     // Turn off periodic allocations to avoid the race between
@@ -84,13 +209,12 @@ public:
 
   // Helper function to post a request to "/api/v1" master endpoint and return
   // the response.
-  Future<v1::master::Response> post(
+  Future<http::Response> post(
       const process::PID<master::Master>& pid,
       const v1::master::Call& call,
-      const ContentType& contentType,
-      const Credential& credential = DEFAULT_CREDENTIAL)
+      const ContentType& contentType)
   {
-    http::Headers headers = createBasicAuthHeaders(credential);
+    http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
     headers["Accept"] = stringify(contentType);
 
     return http::post(
@@ -98,15 +222,24 @@ public:
         "api/v1",
         headers,
         serialize(contentType, call),
-        stringify(contentType))
-      .then([contentType](const http::Response& response)
-            -> Future<v1::master::Response> {
-        if (response.status != http::OK().status) {
-          return Failure("Unexpected response status " + response.status);
-        }
-        return deserialize<v1::master::Response>(contentType, response.body);
-      });
+        stringify(contentType));
   }
+
+protected:
+  master::Flags masterFlags;
+  Owned<cluster::Master> master;
+  Owned<MasterDetector> detector;
+
+  slave::Flags agentFlags;
+  Owned<cluster::Slave> slave;
+  v1::AgentID agentId;
+
+  std::shared_ptr<v1::MockHTTPScheduler> scheduler;
+  v1::FrameworkInfo frameworkInfo;
+  std::shared_ptr<v1::scheduler::TestMesos> mesos;
+  v1::FrameworkID frameworkId;
+
+  v1::TaskInfo taskInfo;
 };
 
 
@@ -121,99 +254,6 @@ INSTANTIATE_TEST_CASE_P(
 // running tasks.
 TEST_P(MasterDrainingTest, DrainAgent)
 {
-  Clock::pause();
-
-  master::Flags masterFlags = CreateMasterFlags();
-  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
-  ASSERT_SOME(master);
-
-  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
-    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
-
-  slave::Flags agentFlags = CreateSlaveFlags();
-  Owned<MasterDetector> detector = master.get()->createDetector();
-  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), agentFlags);
-  ASSERT_SOME(slave);
-
-  Clock::advance(agentFlags.registration_backoff_factor);
-
-  AWAIT_READY(slaveRegisteredMessage);
-
-  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
-
-  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
-  frameworkInfo.add_capabilities()->set_type(
-      v1::FrameworkInfo::Capability::PARTITION_AWARE);
-
-  EXPECT_CALL(*scheduler, connected(_))
-    .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
-
-  Future<v1::scheduler::Event::Subscribed> subscribed;
-  EXPECT_CALL(*scheduler, subscribed(_, _))
-    .WillOnce(FutureArg<1>(&subscribed));
-
-  EXPECT_CALL(*scheduler, heartbeat(_))
-    .WillRepeatedly(Return()); // Ignore heartbeats.
-
-  Future<v1::scheduler::Event::Offers> offers;
-  EXPECT_CALL(*scheduler, offers(_, _))
-    .WillOnce(FutureArg<1>(&offers))
-    .WillRepeatedly(Return());
-
-  auto mesos = std::make_shared<v1::scheduler::TestMesos>(
-      master.get()->pid, ContentType::PROTOBUF, scheduler);
-
-  AWAIT_READY(subscribed);
-  v1::FrameworkID frameworkId(subscribed->framework_id());
-
-  AWAIT_READY(offers);
-  ASSERT_FALSE(offers->offers().empty());
-
-  const v1::Offer& offer = offers->offers(0);
-  const v1::AgentID& agentId = offer.agent_id();
-
-  Try<v1::Resources> resources =
-    v1::Resources::parse("cpus:0.1;mem:64;disk:64");
-
-  ASSERT_SOME(resources);
-
-  v1::TaskInfo taskInfo =
-    v1::createTask(agentId, resources.get(), SLEEP_COMMAND(1000));
-
-  testing::Sequence updateSequence;
-  Future<v1::scheduler::Event::Update> startingUpdate;
-  Future<v1::scheduler::Event::Update> runningUpdate;
-
-  EXPECT_CALL(
-      *scheduler,
-      update(_, AllOf(
-          TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
-          TaskStatusUpdateStateEq(v1::TASK_STARTING))))
-    .InSequence(updateSequence)
-    .WillOnce(DoAll(
-        FutureArg<1>(&startingUpdate),
-        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
-
-  EXPECT_CALL(
-      *scheduler,
-      update(_, AllOf(
-            TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
-            TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
-    .InSequence(updateSequence)
-    .WillOnce(DoAll(
-        FutureArg<1>(&runningUpdate),
-        v1::scheduler::SendAcknowledge(frameworkId, agentId)))
-    .WillRepeatedly(Return());
-
-  mesos->send(
-      v1::createCallAccept(
-          frameworkId,
-          offer,
-          {v1::LAUNCH({taskInfo})}));
-
-  AWAIT_READY(startingUpdate);
-  AWAIT_READY(runningUpdate);
-
   Future<v1::scheduler::Event::Update> killedUpdate;
   EXPECT_CALL(
       *scheduler,
@@ -225,11 +265,11 @@ TEST_P(MasterDrainingTest, DrainAgent)
         v1::scheduler::SendAcknowledge(frameworkId, agentId)));
 
   Future<Nothing> registrarApplyDrained;
-  EXPECT_CALL(*master.get()->registrar, apply(_))
+  EXPECT_CALL(*master->registrar, apply(_))
     .WillOnce(DoDefault())
     .WillOnce(DoAll(
         FutureSatisfy(&registrarApplyDrained),
-        Invoke(master.get()->registrar.get(), &MockRegistrar::unmocked_apply)));
+        Invoke(master->registrar.get(), &MockRegistrar::unmocked_apply)));
 
   ContentType contentType = GetParam();
 
@@ -242,7 +282,9 @@ TEST_P(MasterDrainingTest, DrainAgent)
     call.set_type(v1::master::Call::DRAIN_AGENT);
     call.mutable_drain_agent()->CopyFrom(drainAgent);
 
-    post(master.get()->pid, call, contentType);
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(
+        http::OK().status,
+        post(master->pid, call, contentType));
   }
 
   AWAIT_READY(killedUpdate);
@@ -262,16 +304,19 @@ TEST_P(MasterDrainingTest, DrainAgent)
     v1::master::Call call;
     call.set_type(v1::master::Call::GET_AGENTS);
 
-    Future<v1::master::Response> response =
-      post(master.get()->pid, call, contentType);
+    Future<http::Response> response =
+      post(master->pid, call, contentType);
+    AWAIT_ASSERT_RESPONSE_STATUS_EQ(http::OK().status, response);
 
-    AWAIT_READY(response);
-    ASSERT_TRUE(response->IsInitialized());
-    ASSERT_EQ(v1::master::Response::GET_AGENTS, response->type());
-    ASSERT_EQ(response->get_agents().agents_size(), 1);
+    Try<v1::master::Response> getAgents =
+      deserialize<v1::master::Response>(contentType, response->body);
+    ASSERT_SOME(getAgents);
+
+    ASSERT_EQ(v1::master::Response::GET_AGENTS, getAgents->type());
+    ASSERT_EQ(getAgents->get_agents().agents_size(), 1);
 
     const v1::master::Response::GetAgents::Agent& agent =
-        response->get_agents().agents(0);
+        getAgents->get_agents().agents(0);
 
     EXPECT_EQ(agent.deactivated(), true);
 
@@ -283,13 +328,13 @@ TEST_P(MasterDrainingTest, DrainAgent)
   // '/state' response.
   {
     Future<process::http::Response> response = process::http::get(
-        master.get()->pid,
+        master->pid,
         "state",
         None(),
         createBasicAuthHeaders(DEFAULT_CREDENTIAL));
 
-    AWAIT_EXPECT_RESPONSE_STATUS_EQ(process::http::OK().status, response);
-    AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response);
+    AWAIT_ASSERT_RESPONSE_STATUS_EQ(process::http::OK().status, response);
+    AWAIT_ASSERT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response);
 
     Try<JSON::Object> parse = JSON::parse<JSON::Object>(response->body);
     ASSERT_SOME(parse);
@@ -310,7 +355,7 @@ TEST_P(MasterDrainingTest, DrainAgent)
   // '/state-summary' response.
   {
     Future<process::http::Response> response = process::http::get(
-        master.get()->pid,
+        master->pid,
         "state-summary",
         None(),
         createBasicAuthHeaders(DEFAULT_CREDENTIAL));
@@ -340,99 +385,6 @@ TEST_P(MasterDrainingTest, DrainAgent)
 // once terminal ACKs have been received.
 TEST_P(MasterDrainingTest, DrainAgentMarkGone)
 {
-  Clock::pause();
-
-  master::Flags masterFlags = CreateMasterFlags();
-  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
-  ASSERT_SOME(master);
-
-  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
-    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
-
-  slave::Flags agentFlags = CreateSlaveFlags();
-  Owned<MasterDetector> detector = master.get()->createDetector();
-  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), agentFlags);
-  ASSERT_SOME(slave);
-
-  Clock::advance(agentFlags.registration_backoff_factor);
-
-  AWAIT_READY(slaveRegisteredMessage);
-
-  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
-
-  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
-  frameworkInfo.add_capabilities()->set_type(
-      v1::FrameworkInfo::Capability::PARTITION_AWARE);
-
-  EXPECT_CALL(*scheduler, connected(_))
-    .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
-
-  Future<v1::scheduler::Event::Subscribed> subscribed;
-  EXPECT_CALL(*scheduler, subscribed(_, _))
-    .WillOnce(FutureArg<1>(&subscribed));
-
-  EXPECT_CALL(*scheduler, heartbeat(_))
-    .WillRepeatedly(Return()); // Ignore heartbeats.
-
-  Future<v1::scheduler::Event::Offers> offers;
-  EXPECT_CALL(*scheduler, offers(_, _))
-    .WillOnce(FutureArg<1>(&offers))
-    .WillRepeatedly(Return());
-
-  auto mesos = std::make_shared<v1::scheduler::TestMesos>(
-      master.get()->pid, ContentType::PROTOBUF, scheduler);
-
-  AWAIT_READY(subscribed);
-  v1::FrameworkID frameworkId(subscribed->framework_id());
-
-  AWAIT_READY(offers);
-  ASSERT_FALSE(offers->offers().empty());
-
-  const v1::Offer& offer = offers->offers(0);
-  const v1::AgentID& agentId = offer.agent_id();
-
-  Try<v1::Resources> resources =
-    v1::Resources::parse("cpus:0.1;mem:64;disk:64");
-
-  ASSERT_SOME(resources);
-
-  v1::TaskInfo taskInfo =
-    v1::createTask(agentId, resources.get(), SLEEP_COMMAND(1000));
-
-  testing::Sequence updateSequence;
-  Future<v1::scheduler::Event::Update> startingUpdate;
-  Future<v1::scheduler::Event::Update> runningUpdate;
-
-  EXPECT_CALL(
-      *scheduler,
-      update(_, AllOf(
-          TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
-          TaskStatusUpdateStateEq(v1::TASK_STARTING))))
-    .InSequence(updateSequence)
-    .WillOnce(DoAll(
-        FutureArg<1>(&startingUpdate),
-        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
-
-  EXPECT_CALL(
-      *scheduler,
-      update(_, AllOf(
-            TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
-            TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
-    .InSequence(updateSequence)
-    .WillOnce(DoAll(
-        FutureArg<1>(&runningUpdate),
-        v1::scheduler::SendAcknowledge(frameworkId, agentId)))
-    .WillRepeatedly(Return());
-
-  mesos->send(
-      v1::createCallAccept(
-          frameworkId,
-          offer,
-          {v1::LAUNCH({taskInfo})}));
-
-  AWAIT_READY(startingUpdate);
-  AWAIT_READY(runningUpdate);
-
   Future<v1::scheduler::Event::Update> goneUpdate;
   EXPECT_CALL(
       *scheduler,
@@ -459,7 +411,9 @@ TEST_P(MasterDrainingTest, DrainAgentMarkGone)
     call.set_type(v1::master::Call::DRAIN_AGENT);
     call.mutable_drain_agent()->CopyFrom(drainAgent);
 
-    post(master.get()->pid, call, contentType);
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(
+        http::OK().status,
+        post(master->pid, call, contentType));
   }
 
   AWAIT_READY(goneUpdate);
@@ -472,100 +426,11 @@ TEST_P(MasterDrainingTest, DrainAgentMarkGone)
 // if/when it returns to the cluster.
 TEST_P(MasterDrainingTest, DrainAgentUnreachable)
 {
-  Clock::pause();
-
-  master::Flags masterFlags = CreateMasterFlags();
-  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
-  ASSERT_SOME(master);
-
-  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
-    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
-
-  slave::Flags agentFlags = CreateSlaveFlags();
-  Owned<MasterDetector> detector = master.get()->createDetector();
-  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), agentFlags);
-  ASSERT_SOME(slave);
-
-  Clock::advance(agentFlags.registration_backoff_factor);
-
-  AWAIT_READY(slaveRegisteredMessage);
-
-  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
-
-  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
-  frameworkInfo.set_checkpoint(true);
-  frameworkInfo.add_capabilities()->set_type(
-      v1::FrameworkInfo::Capability::PARTITION_AWARE);
-
-  EXPECT_CALL(*scheduler, connected(_))
-    .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
-
-  Future<v1::scheduler::Event::Subscribed> subscribed;
-  EXPECT_CALL(*scheduler, subscribed(_, _))
-    .WillOnce(FutureArg<1>(&subscribed));
-
-  EXPECT_CALL(*scheduler, heartbeat(_))
-    .WillRepeatedly(Return()); // Ignore heartbeats.
-
-  Future<v1::scheduler::Event::Offers> offers;
-  EXPECT_CALL(*scheduler, offers(_, _))
-    .WillOnce(FutureArg<1>(&offers))
-    .WillRepeatedly(Return());
-
-  auto mesos = std::make_shared<v1::scheduler::TestMesos>(
-      master.get()->pid, ContentType::PROTOBUF, scheduler);
-
-  AWAIT_READY(subscribed);
-  v1::FrameworkID frameworkId(subscribed->framework_id());
-
-  AWAIT_READY(offers);
-  ASSERT_FALSE(offers->offers().empty());
-
-  const v1::Offer& offer = offers->offers(0);
-  const v1::AgentID& agentId = offer.agent_id();
-
-  Try<v1::Resources> resources =
-    v1::Resources::parse("cpus:0.1;mem:64;disk:64");
-
-  ASSERT_SOME(resources);
-
-  v1::TaskInfo taskInfo =
-    v1::createTask(agentId, resources.get(), SLEEP_COMMAND(1000));
-
   testing::Sequence updateSequence;
-  Future<v1::scheduler::Event::Update> startingUpdate;
-  Future<v1::scheduler::Event::Update> runningUpdate;
   Future<v1::scheduler::Event::Update> unreachableUpdate;
-  Future<v1::scheduler::Event::Update> runningUpdate2;
+  Future<v1::scheduler::Event::Update> runningUpdate;
   Future<v1::scheduler::Event::Update> killedUpdate;
 
-  // Make absolutely sure the agent receives these two acknowledgements
-  // before forcing the agent offline.
-  Future<StatusUpdateAcknowledgementMessage> startingAck =
-    FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _);
-  Future<StatusUpdateAcknowledgementMessage> runningAck =
-    FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _);
-
-  EXPECT_CALL(
-      *scheduler,
-      update(_, AllOf(
-          TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
-          TaskStatusUpdateStateEq(v1::TASK_STARTING))))
-    .InSequence(updateSequence)
-    .WillOnce(DoAll(
-        FutureArg<1>(&startingUpdate),
-        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
-
-  EXPECT_CALL(
-      *scheduler,
-      update(_, AllOf(
-            TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
-            TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
-    .InSequence(updateSequence)
-    .WillOnce(DoAll(
-        FutureArg<1>(&runningUpdate),
-        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
-
   EXPECT_CALL(
       *scheduler,
       update(_, AllOf(
@@ -585,7 +450,7 @@ TEST_P(MasterDrainingTest, DrainAgentUnreachable)
             TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
     .InSequence(updateSequence)
     .WillOnce(DoAll(
-        FutureArg<1>(&runningUpdate2),
+        FutureArg<1>(&runningUpdate),
         v1::scheduler::SendAcknowledge(frameworkId, agentId)));
 
   EXPECT_CALL(
@@ -598,20 +463,9 @@ TEST_P(MasterDrainingTest, DrainAgentUnreachable)
         FutureArg<1>(&killedUpdate),
         v1::scheduler::SendAcknowledge(frameworkId, agentId)));
 
-  mesos->send(
-      v1::createCallAccept(
-          frameworkId,
-          offer,
-          {v1::LAUNCH({taskInfo})}));
-
-  AWAIT_READY(startingUpdate);
-  AWAIT_READY(startingAck);
-  AWAIT_READY(runningUpdate);
-  AWAIT_READY(runningAck);
-
   // Simulate an agent crash, so that it disconnects from the master.
-  slave.get()->terminate();
-  slave->reset();
+  slave->terminate();
+  slave.reset();
 
   Clock::advance(masterFlags.agent_reregister_timeout);
   AWAIT_READY(unreachableUpdate);
@@ -627,7 +481,9 @@ TEST_P(MasterDrainingTest, DrainAgentUnreachable)
     call.set_type(v1::master::Call::DRAIN_AGENT);
     call.mutable_drain_agent()->CopyFrom(drainAgent);
 
-    post(master.get()->pid, call, contentType);
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(
+        http::OK().status,
+        post(master->pid, call, contentType));
   }
 
   // Bring the agent back.
@@ -653,7 +509,7 @@ TEST_P(MasterDrainingTest, DrainAgentUnreachable)
 
   // The agent should be told to drain once it reregisters.
   AWAIT_READY(drainSlaveMesage);
-  AWAIT_READY(runningUpdate2);
+  AWAIT_READY(runningUpdate);
   AWAIT_READY(killedUpdate);
 }
 


[mesos] 01/05: Moved master-side agent draining tests into a separate file.

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

josephwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 5124b290ddc368e2e7cc3d56173fb4b3137af620
Author: Joseph Wu <jo...@apache.org>
AuthorDate: Wed Jul 24 15:45:22 2019 -0700

    Moved master-side agent draining tests into a separate file.
    
    The test bodies were not changed, besides renaming the test class.
    
    Review: https://reviews.apache.org/r/71314
---
 src/Makefile.am                     |   3 +-
 src/tests/CMakeLists.txt            |   1 +
 src/tests/api_tests.cpp             | 541 -----------------------------
 src/tests/master_draining_tests.cpp | 662 ++++++++++++++++++++++++++++++++++++
 4 files changed, 665 insertions(+), 542 deletions(-)

diff --git a/src/Makefile.am b/src/Makefile.am
index a89cd61..577acfd 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -2608,7 +2608,8 @@ mesos_tests_SOURCES =						\
   tests/master_allocator_tests.cpp				\
   tests/master_authorization_tests.cpp				\
   tests/master_benchmarks.cpp					\
-  tests/master_contender_detector_tests.cpp     \
+  tests/master_contender_detector_tests.cpp			\
+  tests/master_draining_tests.cpp				\
   tests/master_load_tests.cpp					\
   tests/master_maintenance_tests.cpp				\
   tests/master_quota_tests.cpp					\
diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt
index 04c552a..1e53b39 100644
--- a/src/tests/CMakeLists.txt
+++ b/src/tests/CMakeLists.txt
@@ -105,6 +105,7 @@ set(MESOS_TESTS_SRC
   hook_tests.cpp
   http_authentication_tests.cpp
   http_fault_tolerance_tests.cpp
+  master_draining_tests.cpp
   master_load_tests.cpp
   master_maintenance_tests.cpp
   master_slave_reconciliation_tests.cpp
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index a735a20..bd207ea 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -5470,547 +5470,6 @@ TEST_P(MasterAPITest, OperationUpdatesUponUnreachable)
 }
 
 
-// When an operator submits a DRAIN_AGENT call, the agent should kill all
-// running tasks.
-TEST_P(MasterAPITest, DrainAgent)
-{
-  Clock::pause();
-
-  master::Flags masterFlags = CreateMasterFlags();
-  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
-  ASSERT_SOME(master);
-
-  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
-    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
-
-  slave::Flags agentFlags = CreateSlaveFlags();
-  Owned<MasterDetector> detector = master.get()->createDetector();
-  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), agentFlags);
-  ASSERT_SOME(slave);
-
-  Clock::advance(agentFlags.registration_backoff_factor);
-
-  AWAIT_READY(slaveRegisteredMessage);
-
-  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
-
-  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
-  frameworkInfo.add_capabilities()->set_type(
-      v1::FrameworkInfo::Capability::PARTITION_AWARE);
-
-  EXPECT_CALL(*scheduler, connected(_))
-    .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
-
-  Future<v1::scheduler::Event::Subscribed> subscribed;
-  EXPECT_CALL(*scheduler, subscribed(_, _))
-    .WillOnce(FutureArg<1>(&subscribed));
-
-  EXPECT_CALL(*scheduler, heartbeat(_))
-    .WillRepeatedly(Return()); // Ignore heartbeats.
-
-  Future<v1::scheduler::Event::Offers> offers;
-  EXPECT_CALL(*scheduler, offers(_, _))
-    .WillOnce(FutureArg<1>(&offers))
-    .WillRepeatedly(Return());
-
-  auto mesos = std::make_shared<v1::scheduler::TestMesos>(
-      master.get()->pid, ContentType::PROTOBUF, scheduler);
-
-  AWAIT_READY(subscribed);
-  v1::FrameworkID frameworkId(subscribed->framework_id());
-
-  AWAIT_READY(offers);
-  ASSERT_FALSE(offers->offers().empty());
-
-  const v1::Offer& offer = offers->offers(0);
-  const v1::AgentID& agentId = offer.agent_id();
-
-  Try<v1::Resources> resources =
-    v1::Resources::parse("cpus:0.1;mem:64;disk:64");
-
-  ASSERT_SOME(resources);
-
-  v1::TaskInfo taskInfo =
-    v1::createTask(agentId, resources.get(), SLEEP_COMMAND(1000));
-
-  testing::Sequence updateSequence;
-  Future<v1::scheduler::Event::Update> startingUpdate;
-  Future<v1::scheduler::Event::Update> runningUpdate;
-
-  EXPECT_CALL(
-      *scheduler,
-      update(_, AllOf(
-          TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
-          TaskStatusUpdateStateEq(v1::TASK_STARTING))))
-    .InSequence(updateSequence)
-    .WillOnce(DoAll(
-        FutureArg<1>(&startingUpdate),
-        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
-
-  EXPECT_CALL(
-      *scheduler,
-      update(_, AllOf(
-            TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
-            TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
-    .InSequence(updateSequence)
-    .WillOnce(DoAll(
-        FutureArg<1>(&runningUpdate),
-        v1::scheduler::SendAcknowledge(frameworkId, agentId)))
-    .WillRepeatedly(Return());
-
-  mesos->send(
-      v1::createCallAccept(
-          frameworkId,
-          offer,
-          {v1::LAUNCH({taskInfo})}));
-
-  AWAIT_READY(startingUpdate);
-  AWAIT_READY(runningUpdate);
-
-  Future<v1::scheduler::Event::Update> killedUpdate;
-  EXPECT_CALL(
-      *scheduler,
-      update(_, AllOf(
-            TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
-            TaskStatusUpdateStateEq(v1::TASK_KILLED))))
-    .WillOnce(DoAll(
-        FutureArg<1>(&killedUpdate),
-        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
-
-  Future<Nothing> registrarApplyDrained;
-  EXPECT_CALL(*master.get()->registrar, apply(_))
-    .WillOnce(DoDefault())
-    .WillOnce(DoAll(
-        FutureSatisfy(&registrarApplyDrained),
-        Invoke(master.get()->registrar.get(), &MockRegistrar::unmocked_apply)));
-
-  ContentType contentType = GetParam();
-
-  {
-    v1::master::Call::DrainAgent drainAgent;
-    drainAgent.mutable_agent_id()->CopyFrom(agentId);
-    drainAgent.mutable_max_grace_period()->set_seconds(0);
-
-    v1::master::Call call;
-    call.set_type(v1::master::Call::DRAIN_AGENT);
-    call.mutable_drain_agent()->CopyFrom(drainAgent);
-
-    post(master.get()->pid, call, contentType);
-  }
-
-  AWAIT_READY(killedUpdate);
-  AWAIT_READY(registrarApplyDrained);
-
-  // Ensure that the update acknowledgement has been processed.
-  Clock::settle();
-
-  mesos::v1::DrainInfo drainInfo;
-  drainInfo.set_state(mesos::v1::DRAINED);
-  drainInfo.mutable_config()->set_mark_gone(false);
-  drainInfo.mutable_config()->mutable_max_grace_period()->set_nanoseconds(0);
-
-  // Ensure that the agent's drain info is reflected in the master's
-  // GET_AGENTS response.
-  {
-    v1::master::Call call;
-    call.set_type(v1::master::Call::GET_AGENTS);
-
-    Future<v1::master::Response> response =
-      post(master.get()->pid, call, contentType);
-
-    AWAIT_READY(response);
-    ASSERT_TRUE(response->IsInitialized());
-    ASSERT_EQ(v1::master::Response::GET_AGENTS, response->type());
-    ASSERT_EQ(response->get_agents().agents_size(), 1);
-
-    const v1::master::Response::GetAgents::Agent& agent =
-        response->get_agents().agents(0);
-
-    EXPECT_EQ(agent.deactivated(), true);
-
-    EXPECT_EQ(agent.drain_info(), drainInfo);
-    EXPECT_LT(0, agent.estimated_drain_start_time().nanoseconds());
-  }
-
-  // Ensure that the agent's drain info is reflected in the master's
-  // '/state' response.
-  {
-    Future<process::http::Response> response = process::http::get(
-        master.get()->pid,
-        "state",
-        None(),
-        createBasicAuthHeaders(DEFAULT_CREDENTIAL));
-
-    AWAIT_EXPECT_RESPONSE_STATUS_EQ(process::http::OK().status, response);
-    AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response);
-
-    Try<JSON::Object> parse = JSON::parse<JSON::Object>(response->body);
-    ASSERT_SOME(parse);
-
-    Result<JSON::Object> stateDrainInfo = parse->find<JSON::Object>(
-        "slaves[0].drain_info");
-
-    ASSERT_SOME_EQ(JSON::protobuf(drainInfo), stateDrainInfo);
-
-    Result<JSON::Number> stateDrainStartTime = parse->find<JSON::Number>(
-        "slaves[0].estimated_drain_start_time_seconds");
-
-    ASSERT_SOME(stateDrainStartTime);
-    EXPECT_LT(0, stateDrainStartTime->as<int>());
-  }
-
-  // Ensure that the agent's drain info is reflected in the master's
-  // '/state-summary' response.
-  {
-    Future<process::http::Response> response = process::http::get(
-        master.get()->pid,
-        "state-summary",
-        None(),
-        createBasicAuthHeaders(DEFAULT_CREDENTIAL));
-
-    AWAIT_EXPECT_RESPONSE_STATUS_EQ(process::http::OK().status, response);
-    AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response);
-
-    Try<JSON::Object> parse = JSON::parse<JSON::Object>(response->body);
-    ASSERT_SOME(parse);
-
-    Result<JSON::Object> stateDrainInfo = parse->find<JSON::Object>(
-        "slaves[0].drain_info");
-
-    ASSERT_SOME_EQ(JSON::protobuf(drainInfo), stateDrainInfo);
-
-    Result<JSON::Number> stateDrainStartTime =
-      parse->find<JSON::Number>("slaves[0].estimated_drain_start_time_seconds");
-
-    ASSERT_SOME(stateDrainStartTime);
-    EXPECT_LT(0, stateDrainStartTime->as<int>());
-  }
-}
-
-
-// When an operator submits a DRAIN_AGENT call with 'mark_gone == true', the
-// agent should kill all running tasks and the master should mark the agent gone
-// once terminal ACKs have been received.
-TEST_P(MasterAPITest, DrainAgentMarkGone)
-{
-  Clock::pause();
-
-  master::Flags masterFlags = CreateMasterFlags();
-  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
-  ASSERT_SOME(master);
-
-  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
-    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
-
-  slave::Flags agentFlags = CreateSlaveFlags();
-  Owned<MasterDetector> detector = master.get()->createDetector();
-  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), agentFlags);
-  ASSERT_SOME(slave);
-
-  Clock::advance(agentFlags.registration_backoff_factor);
-
-  AWAIT_READY(slaveRegisteredMessage);
-
-  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
-
-  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
-  frameworkInfo.add_capabilities()->set_type(
-      v1::FrameworkInfo::Capability::PARTITION_AWARE);
-
-  EXPECT_CALL(*scheduler, connected(_))
-    .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
-
-  Future<v1::scheduler::Event::Subscribed> subscribed;
-  EXPECT_CALL(*scheduler, subscribed(_, _))
-    .WillOnce(FutureArg<1>(&subscribed));
-
-  EXPECT_CALL(*scheduler, heartbeat(_))
-    .WillRepeatedly(Return()); // Ignore heartbeats.
-
-  Future<v1::scheduler::Event::Offers> offers;
-  EXPECT_CALL(*scheduler, offers(_, _))
-    .WillOnce(FutureArg<1>(&offers))
-    .WillRepeatedly(Return());
-
-  auto mesos = std::make_shared<v1::scheduler::TestMesos>(
-      master.get()->pid, ContentType::PROTOBUF, scheduler);
-
-  AWAIT_READY(subscribed);
-  v1::FrameworkID frameworkId(subscribed->framework_id());
-
-  AWAIT_READY(offers);
-  ASSERT_FALSE(offers->offers().empty());
-
-  const v1::Offer& offer = offers->offers(0);
-  const v1::AgentID& agentId = offer.agent_id();
-
-  Try<v1::Resources> resources =
-    v1::Resources::parse("cpus:0.1;mem:64;disk:64");
-
-  ASSERT_SOME(resources);
-
-  v1::TaskInfo taskInfo =
-    v1::createTask(agentId, resources.get(), SLEEP_COMMAND(1000));
-
-  testing::Sequence updateSequence;
-  Future<v1::scheduler::Event::Update> startingUpdate;
-  Future<v1::scheduler::Event::Update> runningUpdate;
-
-  EXPECT_CALL(
-      *scheduler,
-      update(_, AllOf(
-          TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
-          TaskStatusUpdateStateEq(v1::TASK_STARTING))))
-    .InSequence(updateSequence)
-    .WillOnce(DoAll(
-        FutureArg<1>(&startingUpdate),
-        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
-
-  EXPECT_CALL(
-      *scheduler,
-      update(_, AllOf(
-            TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
-            TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
-    .InSequence(updateSequence)
-    .WillOnce(DoAll(
-        FutureArg<1>(&runningUpdate),
-        v1::scheduler::SendAcknowledge(frameworkId, agentId)))
-    .WillRepeatedly(Return());
-
-  mesos->send(
-      v1::createCallAccept(
-          frameworkId,
-          offer,
-          {v1::LAUNCH({taskInfo})}));
-
-  AWAIT_READY(startingUpdate);
-  AWAIT_READY(runningUpdate);
-
-  Future<v1::scheduler::Event::Update> goneUpdate;
-  EXPECT_CALL(
-      *scheduler,
-      update(_, AllOf(
-            TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
-            TaskStatusUpdateStateEq(v1::TASK_GONE_BY_OPERATOR))))
-    .WillOnce(DoAll(
-        FutureArg<1>(&goneUpdate),
-        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
-
-  // When the terminal ACK is received by the master, the agent should be marked
-  // gone, which entails sending a `ShutdownMessage`.
-  Future<ShutdownMessage> shutdownMessage =
-    FUTURE_PROTOBUF(ShutdownMessage(), _, _);
-
-  ContentType contentType = GetParam();
-
-  {
-    v1::master::Call::DrainAgent drainAgent;
-    drainAgent.mutable_agent_id()->CopyFrom(agentId);
-    drainAgent.set_mark_gone(true);
-
-    v1::master::Call call;
-    call.set_type(v1::master::Call::DRAIN_AGENT);
-    call.mutable_drain_agent()->CopyFrom(drainAgent);
-
-    post(master.get()->pid, call, contentType);
-  }
-
-  AWAIT_READY(goneUpdate);
-  AWAIT_READY(shutdownMessage);
-}
-
-
-// When an operator submits a DRAIN_AGENT call with an agent that has gone
-// unreachable, the call should succeed, and the agent should be drained
-// if/when it returns to the cluster.
-TEST_P(MasterAPITest, DrainAgentUnreachable)
-{
-  Clock::pause();
-
-  master::Flags masterFlags = CreateMasterFlags();
-  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
-  ASSERT_SOME(master);
-
-  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
-    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
-
-  slave::Flags agentFlags = CreateSlaveFlags();
-  Owned<MasterDetector> detector = master.get()->createDetector();
-  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), agentFlags);
-  ASSERT_SOME(slave);
-
-  Clock::advance(agentFlags.registration_backoff_factor);
-
-  AWAIT_READY(slaveRegisteredMessage);
-
-  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
-
-  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
-  frameworkInfo.set_checkpoint(true);
-  frameworkInfo.add_capabilities()->set_type(
-      v1::FrameworkInfo::Capability::PARTITION_AWARE);
-
-  EXPECT_CALL(*scheduler, connected(_))
-    .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
-
-  Future<v1::scheduler::Event::Subscribed> subscribed;
-  EXPECT_CALL(*scheduler, subscribed(_, _))
-    .WillOnce(FutureArg<1>(&subscribed));
-
-  EXPECT_CALL(*scheduler, heartbeat(_))
-    .WillRepeatedly(Return()); // Ignore heartbeats.
-
-  Future<v1::scheduler::Event::Offers> offers;
-  EXPECT_CALL(*scheduler, offers(_, _))
-    .WillOnce(FutureArg<1>(&offers))
-    .WillRepeatedly(Return());
-
-  auto mesos = std::make_shared<v1::scheduler::TestMesos>(
-      master.get()->pid, ContentType::PROTOBUF, scheduler);
-
-  AWAIT_READY(subscribed);
-  v1::FrameworkID frameworkId(subscribed->framework_id());
-
-  AWAIT_READY(offers);
-  ASSERT_FALSE(offers->offers().empty());
-
-  const v1::Offer& offer = offers->offers(0);
-  const v1::AgentID& agentId = offer.agent_id();
-
-  Try<v1::Resources> resources =
-    v1::Resources::parse("cpus:0.1;mem:64;disk:64");
-
-  ASSERT_SOME(resources);
-
-  v1::TaskInfo taskInfo =
-    v1::createTask(agentId, resources.get(), SLEEP_COMMAND(1000));
-
-  testing::Sequence updateSequence;
-  Future<v1::scheduler::Event::Update> startingUpdate;
-  Future<v1::scheduler::Event::Update> runningUpdate;
-  Future<v1::scheduler::Event::Update> unreachableUpdate;
-  Future<v1::scheduler::Event::Update> runningUpdate2;
-  Future<v1::scheduler::Event::Update> killedUpdate;
-
-  // Make absolutely sure the agent receives these two acknowledgements
-  // before forcing the agent offline.
-  Future<StatusUpdateAcknowledgementMessage> startingAck =
-    FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _);
-  Future<StatusUpdateAcknowledgementMessage> runningAck =
-    FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _);
-
-  EXPECT_CALL(
-      *scheduler,
-      update(_, AllOf(
-          TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
-          TaskStatusUpdateStateEq(v1::TASK_STARTING))))
-    .InSequence(updateSequence)
-    .WillOnce(DoAll(
-        FutureArg<1>(&startingUpdate),
-        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
-
-  EXPECT_CALL(
-      *scheduler,
-      update(_, AllOf(
-            TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
-            TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
-    .InSequence(updateSequence)
-    .WillOnce(DoAll(
-        FutureArg<1>(&runningUpdate),
-        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
-
-  EXPECT_CALL(
-      *scheduler,
-      update(_, AllOf(
-            TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
-            TaskStatusUpdateStateEq(v1::TASK_UNREACHABLE))))
-    .InSequence(updateSequence)
-    .WillOnce(DoAll(
-        FutureArg<1>(&unreachableUpdate),
-        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
-
-  // When the agent is brought back, we expect a TASK_RUNNING followed by
-  // a TASK_KILLED (due to draining).
-  EXPECT_CALL(
-      *scheduler,
-      update(_, AllOf(
-            TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
-            TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
-    .InSequence(updateSequence)
-    .WillOnce(DoAll(
-        FutureArg<1>(&runningUpdate2),
-        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
-
-  EXPECT_CALL(
-      *scheduler,
-      update(_, AllOf(
-            TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
-            TaskStatusUpdateStateEq(v1::TASK_KILLED))))
-    .InSequence(updateSequence)
-    .WillOnce(DoAll(
-        FutureArg<1>(&killedUpdate),
-        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
-
-  mesos->send(
-      v1::createCallAccept(
-          frameworkId,
-          offer,
-          {v1::LAUNCH({taskInfo})}));
-
-  AWAIT_READY(startingUpdate);
-  AWAIT_READY(startingAck);
-  AWAIT_READY(runningUpdate);
-  AWAIT_READY(runningAck);
-
-  // Simulate an agent crash, so that it disconnects from the master.
-  slave.get()->terminate();
-  slave->reset();
-
-  Clock::advance(masterFlags.agent_reregister_timeout);
-  AWAIT_READY(unreachableUpdate);
-
-  // Start draining the unreachable agent.
-  ContentType contentType = GetParam();
-
-  {
-    v1::master::Call::DrainAgent drainAgent;
-    drainAgent.mutable_agent_id()->CopyFrom(agentId);
-
-    v1::master::Call call;
-    call.set_type(v1::master::Call::DRAIN_AGENT);
-    call.mutable_drain_agent()->CopyFrom(drainAgent);
-
-    post(master.get()->pid, call, contentType);
-  }
-
-  // Bring the agent back.
-  Future<ReregisterExecutorMessage> reregisterExecutor =
-    FUTURE_PROTOBUF(ReregisterExecutorMessage(), _, _);
-
-  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
-    FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
-
-  Future<DrainSlaveMessage> drainSlaveMesage =
-    FUTURE_PROTOBUF(DrainSlaveMessage(), _, _);
-
-  Try<Owned<cluster::Slave>> recoveredSlave =
-    StartSlave(detector.get(), agentFlags);
-  ASSERT_SOME(recoveredSlave);
-
-  AWAIT_READY(reregisterExecutor);
-  Clock::advance(agentFlags.executor_reregistration_timeout);
-  Clock::settle();
-  Clock::advance(agentFlags.registration_backoff_factor);
-  Clock::settle();
-  AWAIT_READY(slaveReregisteredMessage);
-
-  // The agent should be told to drain once it reregisters.
-  AWAIT_READY(drainSlaveMesage);
-  AWAIT_READY(runningUpdate2);
-  AWAIT_READY(killedUpdate);
-}
-
-
 class AgentAPITest
   : public MesosTest,
     public WithParamInterface<ContentType>
diff --git a/src/tests/master_draining_tests.cpp b/src/tests/master_draining_tests.cpp
new file mode 100644
index 0000000..16d0c85
--- /dev/null
+++ b/src/tests/master_draining_tests.cpp
@@ -0,0 +1,662 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <string>
+
+#include <mesos/http.hpp>
+
+#include <mesos/v1/mesos.hpp>
+#include <mesos/v1/resources.hpp>
+
+#include <mesos/v1/master/master.hpp>
+
+#include <mesos/v1/scheduler/scheduler.hpp>
+
+#include <process/clock.hpp>
+#include <process/future.hpp>
+#include <process/gmock.hpp>
+#include <process/gtest.hpp>
+#include <process/http.hpp>
+#include <process/owned.hpp>
+
+#include <stout/gtest.hpp>
+#include <stout/nothing.hpp>
+#include <stout/stringify.hpp>
+#include <stout/try.hpp>
+
+#include "common/http.hpp"
+#include "common/protobuf_utils.hpp"
+#include "common/resources_utils.hpp"
+
+#include "messages/messages.hpp"
+
+#include "tests/cluster.hpp"
+#include "tests/mesos.hpp"
+#include "tests/resources_utils.hpp"
+
+namespace http = process::http;
+
+
+using mesos::master::detector::MasterDetector;
+
+using process::Clock;
+using process::Failure;
+using process::Future;
+using process::Owned;
+
+using testing::_;
+using testing::AllOf;
+using testing::DoAll;
+using testing::Return;
+using testing::Sequence;
+using testing::WithParamInterface;
+
+namespace mesos {
+namespace internal {
+namespace tests {
+
+class MasterDrainingTest
+  : public MesosTest,
+    public WithParamInterface<ContentType>
+{
+public:
+  master::Flags CreateMasterFlags() override
+  {
+    // Turn off periodic allocations to avoid the race between
+    // `HierarchicalAllocator::updateAvailable()` and periodic allocations.
+    master::Flags flags = MesosTest::CreateMasterFlags();
+    flags.allocation_interval = Seconds(1000);
+    return flags;
+  }
+
+  // Helper function to post a request to "/api/v1" master endpoint and return
+  // the response.
+  Future<v1::master::Response> post(
+      const process::PID<master::Master>& pid,
+      const v1::master::Call& call,
+      const ContentType& contentType,
+      const Credential& credential = DEFAULT_CREDENTIAL)
+  {
+    http::Headers headers = createBasicAuthHeaders(credential);
+    headers["Accept"] = stringify(contentType);
+
+    return http::post(
+        pid,
+        "api/v1",
+        headers,
+        serialize(contentType, call),
+        stringify(contentType))
+      .then([contentType](const http::Response& response)
+            -> Future<v1::master::Response> {
+        if (response.status != http::OK().status) {
+          return Failure("Unexpected response status " + response.status);
+        }
+        return deserialize<v1::master::Response>(contentType, response.body);
+      });
+  }
+};
+
+
+// These tests are parameterized by the content type of the HTTP request.
+INSTANTIATE_TEST_CASE_P(
+    ContentType,
+    MasterDrainingTest,
+    ::testing::Values(ContentType::PROTOBUF, ContentType::JSON));
+
+
+// When an operator submits a DRAIN_AGENT call, the agent should kill all
+// running tasks.
+TEST_P(MasterDrainingTest, DrainAgent)
+{
+  Clock::pause();
+
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  slave::Flags agentFlags = CreateSlaveFlags();
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), agentFlags);
+  ASSERT_SOME(slave);
+
+  Clock::advance(agentFlags.registration_backoff_factor);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.add_capabilities()->set_type(
+      v1::FrameworkInfo::Capability::PARTITION_AWARE);
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());
+
+  auto mesos = std::make_shared<v1::scheduler::TestMesos>(
+      master.get()->pid, ContentType::PROTOBUF, scheduler);
+
+  AWAIT_READY(subscribed);
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+
+  const v1::Offer& offer = offers->offers(0);
+  const v1::AgentID& agentId = offer.agent_id();
+
+  Try<v1::Resources> resources =
+    v1::Resources::parse("cpus:0.1;mem:64;disk:64");
+
+  ASSERT_SOME(resources);
+
+  v1::TaskInfo taskInfo =
+    v1::createTask(agentId, resources.get(), SLEEP_COMMAND(1000));
+
+  testing::Sequence updateSequence;
+  Future<v1::scheduler::Event::Update> startingUpdate;
+  Future<v1::scheduler::Event::Update> runningUpdate;
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+          TaskStatusUpdateStateEq(v1::TASK_STARTING))))
+    .InSequence(updateSequence)
+    .WillOnce(DoAll(
+        FutureArg<1>(&startingUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+            TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+            TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
+    .InSequence(updateSequence)
+    .WillOnce(DoAll(
+        FutureArg<1>(&runningUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+    .WillRepeatedly(Return());
+
+  mesos->send(
+      v1::createCallAccept(
+          frameworkId,
+          offer,
+          {v1::LAUNCH({taskInfo})}));
+
+  AWAIT_READY(startingUpdate);
+  AWAIT_READY(runningUpdate);
+
+  Future<v1::scheduler::Event::Update> killedUpdate;
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+            TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+            TaskStatusUpdateStateEq(v1::TASK_KILLED))))
+    .WillOnce(DoAll(
+        FutureArg<1>(&killedUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  Future<Nothing> registrarApplyDrained;
+  EXPECT_CALL(*master.get()->registrar, apply(_))
+    .WillOnce(DoDefault())
+    .WillOnce(DoAll(
+        FutureSatisfy(&registrarApplyDrained),
+        Invoke(master.get()->registrar.get(), &MockRegistrar::unmocked_apply)));
+
+  ContentType contentType = GetParam();
+
+  {
+    v1::master::Call::DrainAgent drainAgent;
+    drainAgent.mutable_agent_id()->CopyFrom(agentId);
+    drainAgent.mutable_max_grace_period()->set_seconds(0);
+
+    v1::master::Call call;
+    call.set_type(v1::master::Call::DRAIN_AGENT);
+    call.mutable_drain_agent()->CopyFrom(drainAgent);
+
+    post(master.get()->pid, call, contentType);
+  }
+
+  AWAIT_READY(killedUpdate);
+  AWAIT_READY(registrarApplyDrained);
+
+  // Ensure that the update acknowledgement has been processed.
+  Clock::settle();
+
+  mesos::v1::DrainInfo drainInfo;
+  drainInfo.set_state(mesos::v1::DRAINED);
+  drainInfo.mutable_config()->set_mark_gone(false);
+  drainInfo.mutable_config()->mutable_max_grace_period()->set_nanoseconds(0);
+
+  // Ensure that the agent's drain info is reflected in the master's
+  // GET_AGENTS response.
+  {
+    v1::master::Call call;
+    call.set_type(v1::master::Call::GET_AGENTS);
+
+    Future<v1::master::Response> response =
+      post(master.get()->pid, call, contentType);
+
+    AWAIT_READY(response);
+    ASSERT_TRUE(response->IsInitialized());
+    ASSERT_EQ(v1::master::Response::GET_AGENTS, response->type());
+    ASSERT_EQ(response->get_agents().agents_size(), 1);
+
+    const v1::master::Response::GetAgents::Agent& agent =
+        response->get_agents().agents(0);
+
+    EXPECT_EQ(agent.deactivated(), true);
+
+    EXPECT_EQ(agent.drain_info(), drainInfo);
+    EXPECT_LT(0, agent.estimated_drain_start_time().nanoseconds());
+  }
+
+  // Ensure that the agent's drain info is reflected in the master's
+  // '/state' response.
+  {
+    Future<process::http::Response> response = process::http::get(
+        master.get()->pid,
+        "state",
+        None(),
+        createBasicAuthHeaders(DEFAULT_CREDENTIAL));
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(process::http::OK().status, response);
+    AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response);
+
+    Try<JSON::Object> parse = JSON::parse<JSON::Object>(response->body);
+    ASSERT_SOME(parse);
+
+    Result<JSON::Object> stateDrainInfo = parse->find<JSON::Object>(
+        "slaves[0].drain_info");
+
+    ASSERT_SOME_EQ(JSON::protobuf(drainInfo), stateDrainInfo);
+
+    Result<JSON::Number> stateDrainStartTime = parse->find<JSON::Number>(
+        "slaves[0].estimated_drain_start_time_seconds");
+
+    ASSERT_SOME(stateDrainStartTime);
+    EXPECT_LT(0, stateDrainStartTime->as<int>());
+  }
+
+  // Ensure that the agent's drain info is reflected in the master's
+  // '/state-summary' response.
+  {
+    Future<process::http::Response> response = process::http::get(
+        master.get()->pid,
+        "state-summary",
+        None(),
+        createBasicAuthHeaders(DEFAULT_CREDENTIAL));
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(process::http::OK().status, response);
+    AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response);
+
+    Try<JSON::Object> parse = JSON::parse<JSON::Object>(response->body);
+    ASSERT_SOME(parse);
+
+    Result<JSON::Object> stateDrainInfo = parse->find<JSON::Object>(
+        "slaves[0].drain_info");
+
+    ASSERT_SOME_EQ(JSON::protobuf(drainInfo), stateDrainInfo);
+
+    Result<JSON::Number> stateDrainStartTime =
+      parse->find<JSON::Number>("slaves[0].estimated_drain_start_time_seconds");
+
+    ASSERT_SOME(stateDrainStartTime);
+    EXPECT_LT(0, stateDrainStartTime->as<int>());
+  }
+}
+
+
+// When an operator submits a DRAIN_AGENT call with 'mark_gone == true', the
+// agent should kill all running tasks and the master should mark the agent gone
+// once terminal ACKs have been received.
+TEST_P(MasterDrainingTest, DrainAgentMarkGone)
+{
+  Clock::pause();
+
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  slave::Flags agentFlags = CreateSlaveFlags();
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), agentFlags);
+  ASSERT_SOME(slave);
+
+  Clock::advance(agentFlags.registration_backoff_factor);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.add_capabilities()->set_type(
+      v1::FrameworkInfo::Capability::PARTITION_AWARE);
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());
+
+  auto mesos = std::make_shared<v1::scheduler::TestMesos>(
+      master.get()->pid, ContentType::PROTOBUF, scheduler);
+
+  AWAIT_READY(subscribed);
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+
+  const v1::Offer& offer = offers->offers(0);
+  const v1::AgentID& agentId = offer.agent_id();
+
+  Try<v1::Resources> resources =
+    v1::Resources::parse("cpus:0.1;mem:64;disk:64");
+
+  ASSERT_SOME(resources);
+
+  v1::TaskInfo taskInfo =
+    v1::createTask(agentId, resources.get(), SLEEP_COMMAND(1000));
+
+  testing::Sequence updateSequence;
+  Future<v1::scheduler::Event::Update> startingUpdate;
+  Future<v1::scheduler::Event::Update> runningUpdate;
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+          TaskStatusUpdateStateEq(v1::TASK_STARTING))))
+    .InSequence(updateSequence)
+    .WillOnce(DoAll(
+        FutureArg<1>(&startingUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+            TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+            TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
+    .InSequence(updateSequence)
+    .WillOnce(DoAll(
+        FutureArg<1>(&runningUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+    .WillRepeatedly(Return());
+
+  mesos->send(
+      v1::createCallAccept(
+          frameworkId,
+          offer,
+          {v1::LAUNCH({taskInfo})}));
+
+  AWAIT_READY(startingUpdate);
+  AWAIT_READY(runningUpdate);
+
+  Future<v1::scheduler::Event::Update> goneUpdate;
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+            TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+            TaskStatusUpdateStateEq(v1::TASK_GONE_BY_OPERATOR))))
+    .WillOnce(DoAll(
+        FutureArg<1>(&goneUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  // When the terminal ACK is received by the master, the agent should be marked
+  // gone, which entails sending a `ShutdownMessage`.
+  Future<ShutdownMessage> shutdownMessage =
+    FUTURE_PROTOBUF(ShutdownMessage(), _, _);
+
+  ContentType contentType = GetParam();
+
+  {
+    v1::master::Call::DrainAgent drainAgent;
+    drainAgent.mutable_agent_id()->CopyFrom(agentId);
+    drainAgent.set_mark_gone(true);
+
+    v1::master::Call call;
+    call.set_type(v1::master::Call::DRAIN_AGENT);
+    call.mutable_drain_agent()->CopyFrom(drainAgent);
+
+    post(master.get()->pid, call, contentType);
+  }
+
+  AWAIT_READY(goneUpdate);
+  AWAIT_READY(shutdownMessage);
+}
+
+
+// When an operator submits a DRAIN_AGENT call with an agent that has gone
+// unreachable, the call should succeed, and the agent should be drained
+// if/when it returns to the cluster.
+TEST_P(MasterDrainingTest, DrainAgentUnreachable)
+{
+  Clock::pause();
+
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  slave::Flags agentFlags = CreateSlaveFlags();
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), agentFlags);
+  ASSERT_SOME(slave);
+
+  Clock::advance(agentFlags.registration_backoff_factor);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true);
+  frameworkInfo.add_capabilities()->set_type(
+      v1::FrameworkInfo::Capability::PARTITION_AWARE);
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());
+
+  auto mesos = std::make_shared<v1::scheduler::TestMesos>(
+      master.get()->pid, ContentType::PROTOBUF, scheduler);
+
+  AWAIT_READY(subscribed);
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+
+  const v1::Offer& offer = offers->offers(0);
+  const v1::AgentID& agentId = offer.agent_id();
+
+  Try<v1::Resources> resources =
+    v1::Resources::parse("cpus:0.1;mem:64;disk:64");
+
+  ASSERT_SOME(resources);
+
+  v1::TaskInfo taskInfo =
+    v1::createTask(agentId, resources.get(), SLEEP_COMMAND(1000));
+
+  testing::Sequence updateSequence;
+  Future<v1::scheduler::Event::Update> startingUpdate;
+  Future<v1::scheduler::Event::Update> runningUpdate;
+  Future<v1::scheduler::Event::Update> unreachableUpdate;
+  Future<v1::scheduler::Event::Update> runningUpdate2;
+  Future<v1::scheduler::Event::Update> killedUpdate;
+
+  // Make absolutely sure the agent receives these two acknowledgements
+  // before forcing the agent offline.
+  Future<StatusUpdateAcknowledgementMessage> startingAck =
+    FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _);
+  Future<StatusUpdateAcknowledgementMessage> runningAck =
+    FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _);
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+          TaskStatusUpdateStateEq(v1::TASK_STARTING))))
+    .InSequence(updateSequence)
+    .WillOnce(DoAll(
+        FutureArg<1>(&startingUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+            TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+            TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
+    .InSequence(updateSequence)
+    .WillOnce(DoAll(
+        FutureArg<1>(&runningUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+            TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+            TaskStatusUpdateStateEq(v1::TASK_UNREACHABLE))))
+    .InSequence(updateSequence)
+    .WillOnce(DoAll(
+        FutureArg<1>(&unreachableUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  // When the agent is brought back, we expect a TASK_RUNNING followed by
+  // a TASK_KILLED (due to draining).
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+            TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+            TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
+    .InSequence(updateSequence)
+    .WillOnce(DoAll(
+        FutureArg<1>(&runningUpdate2),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+            TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+            TaskStatusUpdateStateEq(v1::TASK_KILLED))))
+    .InSequence(updateSequence)
+    .WillOnce(DoAll(
+        FutureArg<1>(&killedUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  mesos->send(
+      v1::createCallAccept(
+          frameworkId,
+          offer,
+          {v1::LAUNCH({taskInfo})}));
+
+  AWAIT_READY(startingUpdate);
+  AWAIT_READY(startingAck);
+  AWAIT_READY(runningUpdate);
+  AWAIT_READY(runningAck);
+
+  // Simulate an agent crash, so that it disconnects from the master.
+  slave.get()->terminate();
+  slave->reset();
+
+  Clock::advance(masterFlags.agent_reregister_timeout);
+  AWAIT_READY(unreachableUpdate);
+
+  // Start draining the unreachable agent.
+  ContentType contentType = GetParam();
+
+  {
+    v1::master::Call::DrainAgent drainAgent;
+    drainAgent.mutable_agent_id()->CopyFrom(agentId);
+
+    v1::master::Call call;
+    call.set_type(v1::master::Call::DRAIN_AGENT);
+    call.mutable_drain_agent()->CopyFrom(drainAgent);
+
+    post(master.get()->pid, call, contentType);
+  }
+
+  // Bring the agent back.
+  Future<ReregisterExecutorMessage> reregisterExecutor =
+    FUTURE_PROTOBUF(ReregisterExecutorMessage(), _, _);
+
+  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+    FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+  Future<DrainSlaveMessage> drainSlaveMesage =
+    FUTURE_PROTOBUF(DrainSlaveMessage(), _, _);
+
+  Try<Owned<cluster::Slave>> recoveredSlave =
+    StartSlave(detector.get(), agentFlags);
+  ASSERT_SOME(recoveredSlave);
+
+  AWAIT_READY(reregisterExecutor);
+  Clock::advance(agentFlags.executor_reregistration_timeout);
+  Clock::settle();
+  Clock::advance(agentFlags.registration_backoff_factor);
+  Clock::settle();
+  AWAIT_READY(slaveReregisteredMessage);
+
+  // The agent should be told to drain once it reregisters.
+  AWAIT_READY(drainSlaveMesage);
+  AWAIT_READY(runningUpdate2);
+  AWAIT_READY(killedUpdate);
+}
+
+} // namespace tests {
+} // namespace internal {
+} // namespace mesos {


[mesos] 04/05: Added draining test for momentarily disconnected agents.

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

josephwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 5c5712869876cad50a34af29cdcbfac9b1e9eb45
Author: Joseph Wu <jo...@apache.org>
AuthorDate: Mon Aug 19 12:11:18 2019 -0700

    Added draining test for momentarily disconnected agents.
    
    This exercises the agent draining code when the agent is disconnected
    from the master at the time of starting draining.  Draining is expected
    to proceed once the agent reregisters.
    
    Review: https://reviews.apache.org/r/71317
---
 src/tests/master_draining_tests.cpp | 201 ++++++++++++++++++++++++++++++++++++
 1 file changed, 201 insertions(+)

diff --git a/src/tests/master_draining_tests.cpp b/src/tests/master_draining_tests.cpp
index 674f5b5..235bf1b 100644
--- a/src/tests/master_draining_tests.cpp
+++ b/src/tests/master_draining_tests.cpp
@@ -254,6 +254,99 @@ TEST_P(MasterAlreadyDrainedTest, DrainAgentMarkGone)
 }
 
 
+// When an operator submits a DRAIN_AGENT call with an agent that has
+// momentarily disconnected, the call should succeed, and the agent should
+// be drained when it returns to the cluster.
+TEST_P(MasterAlreadyDrainedTest, DrainAgentDisconnected)
+{
+  // Simulate an agent crash, so that it disconnects from the master.
+  slave->terminate();
+  slave.reset();
+
+  ContentType contentType = GetParam();
+
+  // Ensure that the agent is disconnected (not active).
+  {
+    v1::master::Call call;
+    call.set_type(v1::master::Call::GET_AGENTS);
+
+    Future<http::Response> response =
+      post(master->pid, call, contentType);
+    AWAIT_ASSERT_RESPONSE_STATUS_EQ(http::OK().status, response);
+
+    Try<v1::master::Response> getAgents =
+      deserialize<v1::master::Response>(contentType, response->body);
+    ASSERT_SOME(getAgents);
+
+    ASSERT_EQ(v1::master::Response::GET_AGENTS, getAgents->type());
+    ASSERT_EQ(getAgents->get_agents().agents_size(), 1);
+
+    const v1::master::Response::GetAgents::Agent& agent =
+        getAgents->get_agents().agents(0);
+
+    EXPECT_EQ(agent.active(), false);
+    EXPECT_EQ(agent.deactivated(), false);
+  }
+
+  // Start draining the disconnected agent.
+  {
+    v1::master::Call::DrainAgent drainAgent;
+    drainAgent.mutable_agent_id()->CopyFrom(agentId);
+
+    v1::master::Call call;
+    call.set_type(v1::master::Call::DRAIN_AGENT);
+    call.mutable_drain_agent()->CopyFrom(drainAgent);
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(
+        http::OK().status,
+        post(master->pid, call, contentType));
+  }
+
+  // Bring the agent back.
+  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+    FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+  Future<DrainSlaveMessage> drainSlaveMesage =
+    FUTURE_PROTOBUF(DrainSlaveMessage(), _, _);
+
+  Try<Owned<cluster::Slave>> recoveredSlave =
+    StartSlave(detector.get(), agentFlags);
+  ASSERT_SOME(recoveredSlave);
+
+  Clock::advance(agentFlags.executor_reregistration_timeout);
+  Clock::settle();
+  Clock::advance(agentFlags.registration_backoff_factor);
+  Clock::settle();
+  AWAIT_READY(slaveReregisteredMessage);
+
+  // The agent should be told to drain once it reregisters.
+  AWAIT_READY(drainSlaveMesage);
+
+  // Ensure that the agent is marked as DRAINED in the master now.
+  {
+    v1::master::Call call;
+    call.set_type(v1::master::Call::GET_AGENTS);
+
+    Future<http::Response> response =
+      post(master->pid, call, contentType);
+    AWAIT_ASSERT_RESPONSE_STATUS_EQ(http::OK().status, response);
+
+    Try<v1::master::Response> getAgents =
+      deserialize<v1::master::Response>(contentType, response->body);
+    ASSERT_SOME(getAgents);
+
+    ASSERT_EQ(v1::master::Response::GET_AGENTS, getAgents->type());
+    ASSERT_EQ(getAgents->get_agents().agents_size(), 1);
+
+    const v1::master::Response::GetAgents::Agent& agent =
+        getAgents->get_agents().agents(0);
+
+    EXPECT_EQ(agent.deactivated(), true);
+    EXPECT_EQ(mesos::v1::DRAINED, agent.drain_info().state());
+  }
+}
+
+
 // When an operator submits a DRAIN_AGENT call for an agent that has gone
 // unreachable, the call should succeed, and the agent should be drained
 // if/when it returns to the cluster.
@@ -627,6 +720,114 @@ TEST_P(MasterDrainingTest, DrainAgentMarkGone)
 }
 
 
+// When an operator submits a DRAIN_AGENT call with an agent that has
+// momentarily disconnected, the call should succeed, and the agent should
+// be drained when it returns to the cluster.
+TEST_P(MasterDrainingTest, DrainAgentDisconnected)
+{
+  // Simulate an agent crash, so that it disconnects from the master.
+  slave->terminate();
+  slave.reset();
+
+  ContentType contentType = GetParam();
+
+  // Ensure that the agent is disconnected (not active).
+  {
+    v1::master::Call call;
+    call.set_type(v1::master::Call::GET_AGENTS);
+
+    Future<http::Response> response =
+      post(master->pid, call, contentType);
+    AWAIT_ASSERT_RESPONSE_STATUS_EQ(http::OK().status, response);
+
+    Try<v1::master::Response> getAgents =
+      deserialize<v1::master::Response>(contentType, response->body);
+    ASSERT_SOME(getAgents);
+
+    ASSERT_EQ(v1::master::Response::GET_AGENTS, getAgents->type());
+    ASSERT_EQ(getAgents->get_agents().agents_size(), 1);
+
+    const v1::master::Response::GetAgents::Agent& agent =
+        getAgents->get_agents().agents(0);
+
+    EXPECT_EQ(agent.active(), false);
+    EXPECT_EQ(agent.deactivated(), false);
+  }
+
+  // Start draining the disconnected agent.
+  {
+    v1::master::Call::DrainAgent drainAgent;
+    drainAgent.mutable_agent_id()->CopyFrom(agentId);
+
+    v1::master::Call call;
+    call.set_type(v1::master::Call::DRAIN_AGENT);
+    call.mutable_drain_agent()->CopyFrom(drainAgent);
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(
+        http::OK().status,
+        post(master->pid, call, contentType));
+  }
+
+  // Bring the agent back.
+  Future<ReregisterExecutorMessage> reregisterExecutor =
+    FUTURE_PROTOBUF(ReregisterExecutorMessage(), _, _);
+
+  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+    FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+  Future<DrainSlaveMessage> drainSlaveMesage =
+    FUTURE_PROTOBUF(DrainSlaveMessage(), _, _);
+
+  Future<v1::scheduler::Event::Update> killedUpdate;
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+            TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+            TaskStatusUpdateStateEq(v1::TASK_KILLED))))
+    .WillOnce(DoAll(
+        FutureArg<1>(&killedUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  Try<Owned<cluster::Slave>> recoveredSlave =
+    StartSlave(detector.get(), agentFlags);
+  ASSERT_SOME(recoveredSlave);
+
+  AWAIT_READY(reregisterExecutor);
+  Clock::advance(agentFlags.executor_reregistration_timeout);
+  Clock::settle();
+  Clock::advance(agentFlags.registration_backoff_factor);
+  Clock::settle();
+  AWAIT_READY(slaveReregisteredMessage);
+
+  // The agent should be told to drain once it reregisters.
+  AWAIT_READY(drainSlaveMesage);
+  AWAIT_READY(killedUpdate);
+
+  // Ensure that the agent is marked as DRAINED in the master now.
+  {
+    v1::master::Call call;
+    call.set_type(v1::master::Call::GET_AGENTS);
+
+    Future<http::Response> response =
+      post(master->pid, call, contentType);
+    AWAIT_ASSERT_RESPONSE_STATUS_EQ(http::OK().status, response);
+
+    Try<v1::master::Response> getAgents =
+      deserialize<v1::master::Response>(contentType, response->body);
+    ASSERT_SOME(getAgents);
+
+    ASSERT_EQ(v1::master::Response::GET_AGENTS, getAgents->type());
+    ASSERT_EQ(getAgents->get_agents().agents_size(), 1);
+
+    const v1::master::Response::GetAgents::Agent& agent =
+        getAgents->get_agents().agents(0);
+
+    EXPECT_EQ(agent.deactivated(), true);
+    EXPECT_EQ(mesos::v1::DRAINED, agent.drain_info().state());
+  }
+}
+
+
 // When an operator submits a DRAIN_AGENT call with an agent that has gone
 // unreachable, the call should succeed, and the agent should be drained
 // if/when it returns to the cluster.


[mesos] 03/05: Added draining tests for empty agents.

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

josephwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 1e3661982eba6da71a5ca8178472ef762d9fc780
Author: Joseph Wu <jo...@apache.org>
AuthorDate: Wed Aug 7 09:02:01 2019 -0700

    Added draining tests for empty agents.
    
    This splits the existing agent draining tests into two variants:
    1) where the agent has nothing running, and
    2) where the agent has one task running.
    
    Review: https://reviews.apache.org/r/71316
---
 src/tests/master_draining_tests.cpp | 294 ++++++++++++++++++++++++++++++------
 1 file changed, 250 insertions(+), 44 deletions(-)

diff --git a/src/tests/master_draining_tests.cpp b/src/tests/master_draining_tests.cpp
index eae809f..674f5b5 100644
--- a/src/tests/master_draining_tests.cpp
+++ b/src/tests/master_draining_tests.cpp
@@ -42,6 +42,8 @@
 #include "common/protobuf_utils.hpp"
 #include "common/resources_utils.hpp"
 
+#include "master/registry_operations.hpp"
+
 #include "messages/messages.hpp"
 
 #include "tests/cluster.hpp"
@@ -69,12 +71,12 @@ namespace mesos {
 namespace internal {
 namespace tests {
 
-class MasterDrainingTest
+class MasterAlreadyDrainedTest
   : public MesosTest,
     public WithParamInterface<ContentType>
 {
 public:
-  // Creates a master, agent, framework, and launches one sleep task.
+  // Creates a master and agent.
   void SetUp() override
   {
     MesosTest::SetUp();
@@ -99,6 +101,251 @@ public:
 
     Clock::advance(agentFlags.registration_backoff_factor);
     AWAIT_READY(slaveRegisteredMessage);
+    agentId = evolve(slaveRegisteredMessage->slave_id());
+  }
+
+  void TearDown() override
+  {
+    slave.reset();
+    detector.reset();
+    master.reset();
+
+    Clock::resume();
+
+    MesosTest::TearDown();
+  }
+
+  master::Flags CreateMasterFlags() override
+  {
+    // Turn off periodic allocations to avoid the race between
+    // `HierarchicalAllocator::updateAvailable()` and periodic allocations.
+    master::Flags flags = MesosTest::CreateMasterFlags();
+    flags.allocation_interval = Seconds(1000);
+    return flags;
+  }
+
+  // Helper function to post a request to "/api/v1" master endpoint and return
+  // the response.
+  Future<http::Response> post(
+      const process::PID<master::Master>& pid,
+      const v1::master::Call& call,
+      const ContentType& contentType,
+      const Credential& credential = DEFAULT_CREDENTIAL)
+  {
+    http::Headers headers = createBasicAuthHeaders(credential);
+    headers["Accept"] = stringify(contentType);
+
+    return http::post(
+        pid,
+        "api/v1",
+        headers,
+        serialize(contentType, call),
+        stringify(contentType));
+  }
+
+protected:
+  master::Flags masterFlags;
+  Owned<cluster::Master> master;
+  Owned<MasterDetector> detector;
+
+  slave::Flags agentFlags;
+  Owned<cluster::Slave> slave;
+  v1::AgentID agentId;
+};
+
+
+// These tests are parameterized by the content type of the HTTP request.
+INSTANTIATE_TEST_CASE_P(
+    ContentType,
+    MasterAlreadyDrainedTest,
+    ::testing::Values(ContentType::PROTOBUF, ContentType::JSON));
+
+
+// When an operator submits a DRAIN_AGENT call, the agent with nothing running
+// should be immediately transitioned to the DRAINED state.
+TEST_P(MasterAlreadyDrainedTest, DrainAgent)
+{
+  Future<Nothing> registrarApplyDrained;
+  EXPECT_CALL(*master->registrar, apply(_))
+    .WillOnce(DoDefault())
+    .WillOnce(DoAll(
+        FutureSatisfy(&registrarApplyDrained),
+        Invoke(master->registrar.get(), &MockRegistrar::unmocked_apply)));
+
+  ContentType contentType = GetParam();
+
+  {
+    v1::master::Call::DrainAgent drainAgent;
+    drainAgent.mutable_agent_id()->CopyFrom(agentId);
+    drainAgent.mutable_max_grace_period()->set_seconds(10);
+
+    v1::master::Call call;
+    call.set_type(v1::master::Call::DRAIN_AGENT);
+    call.mutable_drain_agent()->CopyFrom(drainAgent);
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(
+        http::OK().status,
+        post(master->pid, call, contentType));
+  }
+
+  AWAIT_READY(registrarApplyDrained);
+
+  mesos::v1::DrainInfo drainInfo;
+  drainInfo.set_state(mesos::v1::DRAINED);
+  drainInfo.mutable_config()->set_mark_gone(false);
+  drainInfo.mutable_config()->mutable_max_grace_period()
+    ->set_nanoseconds(Seconds(10).ns());
+
+  // Ensure that the agent's drain info is reflected in the master's
+  // GET_AGENTS response.
+  {
+    v1::master::Call call;
+    call.set_type(v1::master::Call::GET_AGENTS);
+
+    Future<http::Response> response =
+      post(master->pid, call, contentType);
+    AWAIT_ASSERT_RESPONSE_STATUS_EQ(http::OK().status, response);
+
+    Try<v1::master::Response> getAgents =
+      deserialize<v1::master::Response>(contentType, response->body);
+    ASSERT_SOME(getAgents);
+
+    ASSERT_EQ(v1::master::Response::GET_AGENTS, getAgents->type());
+    ASSERT_EQ(getAgents->get_agents().agents_size(), 1);
+
+    const v1::master::Response::GetAgents::Agent& agent =
+        getAgents->get_agents().agents(0);
+
+    EXPECT_EQ(agent.deactivated(), true);
+
+    EXPECT_EQ(agent.drain_info(), drainInfo);
+    EXPECT_LT(0, agent.estimated_drain_start_time().nanoseconds());
+  }
+}
+
+
+// When an operator submits a DRAIN_AGENT call with 'mark_gone == true',
+// and the agent is not running anything, the agent should immediately be
+// marked gone.
+TEST_P(MasterAlreadyDrainedTest, DrainAgentMarkGone)
+{
+  // When the terminal ACK is received by the master, the agent should be marked
+  // gone, which entails sending a `ShutdownMessage`.
+  Future<ShutdownMessage> shutdownMessage =
+    FUTURE_PROTOBUF(ShutdownMessage(), _, _);
+
+  ContentType contentType = GetParam();
+
+  {
+    v1::master::Call::DrainAgent drainAgent;
+    drainAgent.mutable_agent_id()->CopyFrom(agentId);
+    drainAgent.set_mark_gone(true);
+
+    v1::master::Call call;
+    call.set_type(v1::master::Call::DRAIN_AGENT);
+    call.mutable_drain_agent()->CopyFrom(drainAgent);
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(
+        http::OK().status,
+        post(master->pid, call, contentType));
+  }
+
+  AWAIT_READY(shutdownMessage);
+}
+
+
+// When an operator submits a DRAIN_AGENT call for an agent that has gone
+// unreachable, the call should succeed, and the agent should be drained
+// if/when it returns to the cluster.
+TEST_P(MasterAlreadyDrainedTest, DrainAgentUnreachable)
+{
+  Future<Owned<master::RegistryOperation>> registrarApplyUnreachable;
+  EXPECT_CALL(*master->registrar, apply(_))
+    .WillOnce(DoAll(
+        FutureArg<0>(&registrarApplyUnreachable),
+        Invoke(master->registrar.get(), &MockRegistrar::unmocked_apply)))
+    .WillRepeatedly(DoDefault());
+
+  // Simulate an agent crash, so that it disconnects from the master.
+  slave->terminate();
+  slave.reset();
+
+  Clock::advance(masterFlags.agent_reregister_timeout);
+  AWAIT_READY(registrarApplyUnreachable);
+  ASSERT_NE(
+      nullptr,
+      dynamic_cast<master::MarkSlaveUnreachable*>(
+          registrarApplyUnreachable->get()));
+
+  // Start draining the unreachable agent.
+  ContentType contentType = GetParam();
+
+  {
+    v1::master::Call::DrainAgent drainAgent;
+    drainAgent.mutable_agent_id()->CopyFrom(agentId);
+
+    v1::master::Call call;
+    call.set_type(v1::master::Call::DRAIN_AGENT);
+    call.mutable_drain_agent()->CopyFrom(drainAgent);
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(
+        http::OK().status,
+        post(master->pid, call, contentType));
+  }
+
+  // Bring the agent back.
+  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+    FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+  Future<DrainSlaveMessage> drainSlaveMesage =
+    FUTURE_PROTOBUF(DrainSlaveMessage(), _, _);
+
+  Try<Owned<cluster::Slave>> recoveredSlave =
+    StartSlave(detector.get(), agentFlags);
+  ASSERT_SOME(recoveredSlave);
+
+  Clock::advance(agentFlags.executor_reregistration_timeout);
+  Clock::settle();
+  Clock::advance(agentFlags.registration_backoff_factor);
+  Clock::settle();
+  AWAIT_READY(slaveReregisteredMessage);
+
+  // The agent should be told to drain once it reregisters.
+  AWAIT_READY(drainSlaveMesage);
+
+  // Ensure that the agent is marked as DRAINED in the master now.
+  {
+    v1::master::Call call;
+    call.set_type(v1::master::Call::GET_AGENTS);
+
+    Future<http::Response> response =
+      post(master->pid, call, contentType);
+    AWAIT_ASSERT_RESPONSE_STATUS_EQ(http::OK().status, response);
+
+    Try<v1::master::Response> getAgents =
+      deserialize<v1::master::Response>(contentType, response->body);
+    ASSERT_SOME(getAgents);
+
+    ASSERT_EQ(v1::master::Response::GET_AGENTS, getAgents->type());
+    ASSERT_EQ(getAgents->get_agents().agents_size(), 1);
+
+    const v1::master::Response::GetAgents::Agent& agent =
+        getAgents->get_agents().agents(0);
+
+    EXPECT_EQ(agent.deactivated(), true);
+    EXPECT_EQ(mesos::v1::DRAINED, agent.drain_info().state());
+  }
+}
+
+
+class MasterDrainingTest
+  : public MasterAlreadyDrainedTest
+{
+public:
+  // Creates a master, agent, framework, and launches one sleep task.
+  void SetUp() override
+  {
+    MasterAlreadyDrainedTest::SetUp();
 
     // Create the framework.
     scheduler = std::make_shared<v1::MockHTTPScheduler>();
@@ -134,7 +381,6 @@ public:
     ASSERT_FALSE(offers->offers().empty());
 
     const v1::Offer& offer = offers->offers(0);
-    agentId = offer.agent_id();
 
     Try<v1::Resources> resources =
       v1::Resources::parse("cpus:0.1;mem:64;disk:64");
@@ -189,51 +435,11 @@ public:
   {
     mesos.reset();
     scheduler.reset();
-    slave.reset();
-    detector.reset();
-    master.reset();
-
-    Clock::resume();
-
-    MesosTest::TearDown();
-  }
-
-  master::Flags CreateMasterFlags() override
-  {
-    // Turn off periodic allocations to avoid the race between
-    // `HierarchicalAllocator::updateAvailable()` and periodic allocations.
-    master::Flags flags = MesosTest::CreateMasterFlags();
-    flags.allocation_interval = Seconds(1000);
-    return flags;
-  }
-
-  // Helper function to post a request to "/api/v1" master endpoint and return
-  // the response.
-  Future<http::Response> post(
-      const process::PID<master::Master>& pid,
-      const v1::master::Call& call,
-      const ContentType& contentType)
-  {
-    http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
-    headers["Accept"] = stringify(contentType);
 
-    return http::post(
-        pid,
-        "api/v1",
-        headers,
-        serialize(contentType, call),
-        stringify(contentType));
+    MasterAlreadyDrainedTest::TearDown();
   }
 
 protected:
-  master::Flags masterFlags;
-  Owned<cluster::Master> master;
-  Owned<MasterDetector> detector;
-
-  slave::Flags agentFlags;
-  Owned<cluster::Slave> slave;
-  v1::AgentID agentId;
-
   std::shared_ptr<v1::MockHTTPScheduler> scheduler;
   v1::FrameworkInfo frameworkInfo;
   std::shared_ptr<v1::scheduler::TestMesos> mesos;


[mesos] 05/05: Added agent reactivations to the existing agent draining tests.

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

josephwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 50dcd56a42ee03d354f39cb029befe9e60e7f0bf
Author: Joseph Wu <jo...@apache.org>
AuthorDate: Mon Aug 19 14:35:34 2019 -0700

    Added agent reactivations to the existing agent draining tests.
    
    This adds an extra step to a couple of the agent draining tests,
    which calls REACTIVATE_AGENT at the end.
    
    Review: https://reviews.apache.org/r/71318
---
 src/tests/master_draining_tests.cpp | 93 +++++++++++++++++++++++++++++++++++++
 1 file changed, 93 insertions(+)

diff --git a/src/tests/master_draining_tests.cpp b/src/tests/master_draining_tests.cpp
index 235bf1b..f1a00df 100644
--- a/src/tests/master_draining_tests.cpp
+++ b/src/tests/master_draining_tests.cpp
@@ -563,11 +563,18 @@ TEST_P(MasterDrainingTest, DrainAgent)
         FutureArg<1>(&killedUpdate),
         v1::scheduler::SendAcknowledge(frameworkId, agentId)));
 
+  Future<StatusUpdateAcknowledgementMessage> killedAck =
+    FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _);
+
   Future<Nothing> registrarApplyDrained;
+  Future<Nothing> registrarApplyReactivated;
   EXPECT_CALL(*master->registrar, apply(_))
     .WillOnce(DoDefault())
     .WillOnce(DoAll(
         FutureSatisfy(&registrarApplyDrained),
+        Invoke(master->registrar.get(), &MockRegistrar::unmocked_apply)))
+    .WillOnce(DoAll(
+        FutureSatisfy(&registrarApplyReactivated),
         Invoke(master->registrar.get(), &MockRegistrar::unmocked_apply)));
 
   ContentType contentType = GetParam();
@@ -587,6 +594,7 @@ TEST_P(MasterDrainingTest, DrainAgent)
   }
 
   AWAIT_READY(killedUpdate);
+  AWAIT_READY(killedAck);
   AWAIT_READY(registrarApplyDrained);
 
   // Ensure that the update acknowledgement has been processed.
@@ -676,6 +684,33 @@ TEST_P(MasterDrainingTest, DrainAgent)
     ASSERT_SOME(stateDrainStartTime);
     EXPECT_LT(0, stateDrainStartTime->as<int>());
   }
+
+  // Reactivate the agent and expect to get the agent in an offer.
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  {
+    v1::master::Call::ReactivateAgent reactivateAgent;
+    reactivateAgent.mutable_agent_id()->CopyFrom(agentId);
+
+    v1::master::Call call;
+    call.set_type(v1::master::Call::REACTIVATE_AGENT);
+    call.mutable_reactivate_agent()->CopyFrom(reactivateAgent);
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(
+        http::OK().status,
+        post(master->pid, call, contentType));
+  }
+
+  AWAIT_READY(registrarApplyReactivated);
+
+  Clock::advance(masterFlags.allocation_interval);
+  Clock::settle();
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+  EXPECT_EQ(agentId, offers->offers(0).agent_id());
 }
 
 
@@ -788,6 +823,9 @@ TEST_P(MasterDrainingTest, DrainAgentDisconnected)
         FutureArg<1>(&killedUpdate),
         v1::scheduler::SendAcknowledge(frameworkId, agentId)));
 
+  Future<StatusUpdateAcknowledgementMessage> killedAck =
+    FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _);
+
   Try<Owned<cluster::Slave>> recoveredSlave =
     StartSlave(detector.get(), agentFlags);
   ASSERT_SOME(recoveredSlave);
@@ -802,6 +840,7 @@ TEST_P(MasterDrainingTest, DrainAgentDisconnected)
   // The agent should be told to drain once it reregisters.
   AWAIT_READY(drainSlaveMesage);
   AWAIT_READY(killedUpdate);
+  AWAIT_READY(killedAck);
 
   // Ensure that the agent is marked as DRAINED in the master now.
   {
@@ -825,6 +864,31 @@ TEST_P(MasterDrainingTest, DrainAgentDisconnected)
     EXPECT_EQ(agent.deactivated(), true);
     EXPECT_EQ(mesos::v1::DRAINED, agent.drain_info().state());
   }
+
+  // Reactivate the agent and expect to get the agent in an offer.
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  {
+    v1::master::Call::ReactivateAgent reactivateAgent;
+    reactivateAgent.mutable_agent_id()->CopyFrom(agentId);
+
+    v1::master::Call call;
+    call.set_type(v1::master::Call::REACTIVATE_AGENT);
+    call.mutable_reactivate_agent()->CopyFrom(reactivateAgent);
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(
+        http::OK().status,
+        post(master->pid, call, contentType));
+  }
+
+  Clock::advance(masterFlags.allocation_interval);
+  Clock::settle();
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+  EXPECT_EQ(agentId, offers->offers(0).agent_id());
 }
 
 
@@ -870,6 +934,9 @@ TEST_P(MasterDrainingTest, DrainAgentUnreachable)
         FutureArg<1>(&killedUpdate),
         v1::scheduler::SendAcknowledge(frameworkId, agentId)));
 
+  Future<StatusUpdateAcknowledgementMessage> killedAck =
+    FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _);
+
   // Simulate an agent crash, so that it disconnects from the master.
   slave->terminate();
   slave.reset();
@@ -918,6 +985,32 @@ TEST_P(MasterDrainingTest, DrainAgentUnreachable)
   AWAIT_READY(drainSlaveMesage);
   AWAIT_READY(runningUpdate);
   AWAIT_READY(killedUpdate);
+  AWAIT_READY(killedAck);
+
+  // Reactivate the agent and expect to get the agent in an offer.
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  {
+    v1::master::Call::ReactivateAgent reactivateAgent;
+    reactivateAgent.mutable_agent_id()->CopyFrom(agentId);
+
+    v1::master::Call call;
+    call.set_type(v1::master::Call::REACTIVATE_AGENT);
+    call.mutable_reactivate_agent()->CopyFrom(reactivateAgent);
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(
+        http::OK().status,
+        post(master->pid, call, contentType));
+  }
+
+  Clock::advance(masterFlags.allocation_interval);
+  Clock::settle();
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+  EXPECT_EQ(agentId, offers->offers(0).agent_id());
 }
 
 } // namespace tests {