You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jo...@apache.org on 2019/08/26 23:09:57 UTC

[mesos] 01/05: Moved master-side agent draining tests into a separate file.

This is an automated email from the ASF dual-hosted git repository.

josephwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 5124b290ddc368e2e7cc3d56173fb4b3137af620
Author: Joseph Wu <jo...@apache.org>
AuthorDate: Wed Jul 24 15:45:22 2019 -0700

    Moved master-side agent draining tests into a separate file.
    
    The test bodies were not changed, besides renaming the test class.
    
    Review: https://reviews.apache.org/r/71314
---
 src/Makefile.am                     |   3 +-
 src/tests/CMakeLists.txt            |   1 +
 src/tests/api_tests.cpp             | 541 -----------------------------
 src/tests/master_draining_tests.cpp | 662 ++++++++++++++++++++++++++++++++++++
 4 files changed, 665 insertions(+), 542 deletions(-)

diff --git a/src/Makefile.am b/src/Makefile.am
index a89cd61..577acfd 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -2608,7 +2608,8 @@ mesos_tests_SOURCES =						\
   tests/master_allocator_tests.cpp				\
   tests/master_authorization_tests.cpp				\
   tests/master_benchmarks.cpp					\
-  tests/master_contender_detector_tests.cpp     \
+  tests/master_contender_detector_tests.cpp			\
+  tests/master_draining_tests.cpp				\
   tests/master_load_tests.cpp					\
   tests/master_maintenance_tests.cpp				\
   tests/master_quota_tests.cpp					\
diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt
index 04c552a..1e53b39 100644
--- a/src/tests/CMakeLists.txt
+++ b/src/tests/CMakeLists.txt
@@ -105,6 +105,7 @@ set(MESOS_TESTS_SRC
   hook_tests.cpp
   http_authentication_tests.cpp
   http_fault_tolerance_tests.cpp
+  master_draining_tests.cpp
   master_load_tests.cpp
   master_maintenance_tests.cpp
   master_slave_reconciliation_tests.cpp
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index a735a20..bd207ea 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -5470,547 +5470,6 @@ TEST_P(MasterAPITest, OperationUpdatesUponUnreachable)
 }
 
 
-// When an operator submits a DRAIN_AGENT call, the agent should kill all
-// running tasks.
-TEST_P(MasterAPITest, DrainAgent)
-{
-  Clock::pause();
-
-  master::Flags masterFlags = CreateMasterFlags();
-  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
-  ASSERT_SOME(master);
-
-  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
-    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
-
-  slave::Flags agentFlags = CreateSlaveFlags();
-  Owned<MasterDetector> detector = master.get()->createDetector();
-  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), agentFlags);
-  ASSERT_SOME(slave);
-
-  Clock::advance(agentFlags.registration_backoff_factor);
-
-  AWAIT_READY(slaveRegisteredMessage);
-
-  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
-
-  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
-  frameworkInfo.add_capabilities()->set_type(
-      v1::FrameworkInfo::Capability::PARTITION_AWARE);
-
-  EXPECT_CALL(*scheduler, connected(_))
-    .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
-
-  Future<v1::scheduler::Event::Subscribed> subscribed;
-  EXPECT_CALL(*scheduler, subscribed(_, _))
-    .WillOnce(FutureArg<1>(&subscribed));
-
-  EXPECT_CALL(*scheduler, heartbeat(_))
-    .WillRepeatedly(Return()); // Ignore heartbeats.
-
-  Future<v1::scheduler::Event::Offers> offers;
-  EXPECT_CALL(*scheduler, offers(_, _))
-    .WillOnce(FutureArg<1>(&offers))
-    .WillRepeatedly(Return());
-
-  auto mesos = std::make_shared<v1::scheduler::TestMesos>(
-      master.get()->pid, ContentType::PROTOBUF, scheduler);
-
-  AWAIT_READY(subscribed);
-  v1::FrameworkID frameworkId(subscribed->framework_id());
-
-  AWAIT_READY(offers);
-  ASSERT_FALSE(offers->offers().empty());
-
-  const v1::Offer& offer = offers->offers(0);
-  const v1::AgentID& agentId = offer.agent_id();
-
-  Try<v1::Resources> resources =
-    v1::Resources::parse("cpus:0.1;mem:64;disk:64");
-
-  ASSERT_SOME(resources);
-
-  v1::TaskInfo taskInfo =
-    v1::createTask(agentId, resources.get(), SLEEP_COMMAND(1000));
-
-  testing::Sequence updateSequence;
-  Future<v1::scheduler::Event::Update> startingUpdate;
-  Future<v1::scheduler::Event::Update> runningUpdate;
-
-  EXPECT_CALL(
-      *scheduler,
-      update(_, AllOf(
-          TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
-          TaskStatusUpdateStateEq(v1::TASK_STARTING))))
-    .InSequence(updateSequence)
-    .WillOnce(DoAll(
-        FutureArg<1>(&startingUpdate),
-        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
-
-  EXPECT_CALL(
-      *scheduler,
-      update(_, AllOf(
-            TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
-            TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
-    .InSequence(updateSequence)
-    .WillOnce(DoAll(
-        FutureArg<1>(&runningUpdate),
-        v1::scheduler::SendAcknowledge(frameworkId, agentId)))
-    .WillRepeatedly(Return());
-
-  mesos->send(
-      v1::createCallAccept(
-          frameworkId,
-          offer,
-          {v1::LAUNCH({taskInfo})}));
-
-  AWAIT_READY(startingUpdate);
-  AWAIT_READY(runningUpdate);
-
-  Future<v1::scheduler::Event::Update> killedUpdate;
-  EXPECT_CALL(
-      *scheduler,
-      update(_, AllOf(
-            TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
-            TaskStatusUpdateStateEq(v1::TASK_KILLED))))
-    .WillOnce(DoAll(
-        FutureArg<1>(&killedUpdate),
-        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
-
-  Future<Nothing> registrarApplyDrained;
-  EXPECT_CALL(*master.get()->registrar, apply(_))
-    .WillOnce(DoDefault())
-    .WillOnce(DoAll(
-        FutureSatisfy(&registrarApplyDrained),
-        Invoke(master.get()->registrar.get(), &MockRegistrar::unmocked_apply)));
-
-  ContentType contentType = GetParam();
-
-  {
-    v1::master::Call::DrainAgent drainAgent;
-    drainAgent.mutable_agent_id()->CopyFrom(agentId);
-    drainAgent.mutable_max_grace_period()->set_seconds(0);
-
-    v1::master::Call call;
-    call.set_type(v1::master::Call::DRAIN_AGENT);
-    call.mutable_drain_agent()->CopyFrom(drainAgent);
-
-    post(master.get()->pid, call, contentType);
-  }
-
-  AWAIT_READY(killedUpdate);
-  AWAIT_READY(registrarApplyDrained);
-
-  // Ensure that the update acknowledgement has been processed.
-  Clock::settle();
-
-  mesos::v1::DrainInfo drainInfo;
-  drainInfo.set_state(mesos::v1::DRAINED);
-  drainInfo.mutable_config()->set_mark_gone(false);
-  drainInfo.mutable_config()->mutable_max_grace_period()->set_nanoseconds(0);
-
-  // Ensure that the agent's drain info is reflected in the master's
-  // GET_AGENTS response.
-  {
-    v1::master::Call call;
-    call.set_type(v1::master::Call::GET_AGENTS);
-
-    Future<v1::master::Response> response =
-      post(master.get()->pid, call, contentType);
-
-    AWAIT_READY(response);
-    ASSERT_TRUE(response->IsInitialized());
-    ASSERT_EQ(v1::master::Response::GET_AGENTS, response->type());
-    ASSERT_EQ(response->get_agents().agents_size(), 1);
-
-    const v1::master::Response::GetAgents::Agent& agent =
-        response->get_agents().agents(0);
-
-    EXPECT_EQ(agent.deactivated(), true);
-
-    EXPECT_EQ(agent.drain_info(), drainInfo);
-    EXPECT_LT(0, agent.estimated_drain_start_time().nanoseconds());
-  }
-
-  // Ensure that the agent's drain info is reflected in the master's
-  // '/state' response.
-  {
-    Future<process::http::Response> response = process::http::get(
-        master.get()->pid,
-        "state",
-        None(),
-        createBasicAuthHeaders(DEFAULT_CREDENTIAL));
-
-    AWAIT_EXPECT_RESPONSE_STATUS_EQ(process::http::OK().status, response);
-    AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response);
-
-    Try<JSON::Object> parse = JSON::parse<JSON::Object>(response->body);
-    ASSERT_SOME(parse);
-
-    Result<JSON::Object> stateDrainInfo = parse->find<JSON::Object>(
-        "slaves[0].drain_info");
-
-    ASSERT_SOME_EQ(JSON::protobuf(drainInfo), stateDrainInfo);
-
-    Result<JSON::Number> stateDrainStartTime = parse->find<JSON::Number>(
-        "slaves[0].estimated_drain_start_time_seconds");
-
-    ASSERT_SOME(stateDrainStartTime);
-    EXPECT_LT(0, stateDrainStartTime->as<int>());
-  }
-
-  // Ensure that the agent's drain info is reflected in the master's
-  // '/state-summary' response.
-  {
-    Future<process::http::Response> response = process::http::get(
-        master.get()->pid,
-        "state-summary",
-        None(),
-        createBasicAuthHeaders(DEFAULT_CREDENTIAL));
-
-    AWAIT_EXPECT_RESPONSE_STATUS_EQ(process::http::OK().status, response);
-    AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response);
-
-    Try<JSON::Object> parse = JSON::parse<JSON::Object>(response->body);
-    ASSERT_SOME(parse);
-
-    Result<JSON::Object> stateDrainInfo = parse->find<JSON::Object>(
-        "slaves[0].drain_info");
-
-    ASSERT_SOME_EQ(JSON::protobuf(drainInfo), stateDrainInfo);
-
-    Result<JSON::Number> stateDrainStartTime =
-      parse->find<JSON::Number>("slaves[0].estimated_drain_start_time_seconds");
-
-    ASSERT_SOME(stateDrainStartTime);
-    EXPECT_LT(0, stateDrainStartTime->as<int>());
-  }
-}
-
-
-// When an operator submits a DRAIN_AGENT call with 'mark_gone == true', the
-// agent should kill all running tasks and the master should mark the agent gone
-// once terminal ACKs have been received.
-TEST_P(MasterAPITest, DrainAgentMarkGone)
-{
-  Clock::pause();
-
-  master::Flags masterFlags = CreateMasterFlags();
-  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
-  ASSERT_SOME(master);
-
-  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
-    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
-
-  slave::Flags agentFlags = CreateSlaveFlags();
-  Owned<MasterDetector> detector = master.get()->createDetector();
-  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), agentFlags);
-  ASSERT_SOME(slave);
-
-  Clock::advance(agentFlags.registration_backoff_factor);
-
-  AWAIT_READY(slaveRegisteredMessage);
-
-  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
-
-  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
-  frameworkInfo.add_capabilities()->set_type(
-      v1::FrameworkInfo::Capability::PARTITION_AWARE);
-
-  EXPECT_CALL(*scheduler, connected(_))
-    .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
-
-  Future<v1::scheduler::Event::Subscribed> subscribed;
-  EXPECT_CALL(*scheduler, subscribed(_, _))
-    .WillOnce(FutureArg<1>(&subscribed));
-
-  EXPECT_CALL(*scheduler, heartbeat(_))
-    .WillRepeatedly(Return()); // Ignore heartbeats.
-
-  Future<v1::scheduler::Event::Offers> offers;
-  EXPECT_CALL(*scheduler, offers(_, _))
-    .WillOnce(FutureArg<1>(&offers))
-    .WillRepeatedly(Return());
-
-  auto mesos = std::make_shared<v1::scheduler::TestMesos>(
-      master.get()->pid, ContentType::PROTOBUF, scheduler);
-
-  AWAIT_READY(subscribed);
-  v1::FrameworkID frameworkId(subscribed->framework_id());
-
-  AWAIT_READY(offers);
-  ASSERT_FALSE(offers->offers().empty());
-
-  const v1::Offer& offer = offers->offers(0);
-  const v1::AgentID& agentId = offer.agent_id();
-
-  Try<v1::Resources> resources =
-    v1::Resources::parse("cpus:0.1;mem:64;disk:64");
-
-  ASSERT_SOME(resources);
-
-  v1::TaskInfo taskInfo =
-    v1::createTask(agentId, resources.get(), SLEEP_COMMAND(1000));
-
-  testing::Sequence updateSequence;
-  Future<v1::scheduler::Event::Update> startingUpdate;
-  Future<v1::scheduler::Event::Update> runningUpdate;
-
-  EXPECT_CALL(
-      *scheduler,
-      update(_, AllOf(
-          TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
-          TaskStatusUpdateStateEq(v1::TASK_STARTING))))
-    .InSequence(updateSequence)
-    .WillOnce(DoAll(
-        FutureArg<1>(&startingUpdate),
-        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
-
-  EXPECT_CALL(
-      *scheduler,
-      update(_, AllOf(
-            TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
-            TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
-    .InSequence(updateSequence)
-    .WillOnce(DoAll(
-        FutureArg<1>(&runningUpdate),
-        v1::scheduler::SendAcknowledge(frameworkId, agentId)))
-    .WillRepeatedly(Return());
-
-  mesos->send(
-      v1::createCallAccept(
-          frameworkId,
-          offer,
-          {v1::LAUNCH({taskInfo})}));
-
-  AWAIT_READY(startingUpdate);
-  AWAIT_READY(runningUpdate);
-
-  Future<v1::scheduler::Event::Update> goneUpdate;
-  EXPECT_CALL(
-      *scheduler,
-      update(_, AllOf(
-            TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
-            TaskStatusUpdateStateEq(v1::TASK_GONE_BY_OPERATOR))))
-    .WillOnce(DoAll(
-        FutureArg<1>(&goneUpdate),
-        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
-
-  // When the terminal ACK is received by the master, the agent should be marked
-  // gone, which entails sending a `ShutdownMessage`.
-  Future<ShutdownMessage> shutdownMessage =
-    FUTURE_PROTOBUF(ShutdownMessage(), _, _);
-
-  ContentType contentType = GetParam();
-
-  {
-    v1::master::Call::DrainAgent drainAgent;
-    drainAgent.mutable_agent_id()->CopyFrom(agentId);
-    drainAgent.set_mark_gone(true);
-
-    v1::master::Call call;
-    call.set_type(v1::master::Call::DRAIN_AGENT);
-    call.mutable_drain_agent()->CopyFrom(drainAgent);
-
-    post(master.get()->pid, call, contentType);
-  }
-
-  AWAIT_READY(goneUpdate);
-  AWAIT_READY(shutdownMessage);
-}
-
-
-// When an operator submits a DRAIN_AGENT call with an agent that has gone
-// unreachable, the call should succeed, and the agent should be drained
-// if/when it returns to the cluster.
-TEST_P(MasterAPITest, DrainAgentUnreachable)
-{
-  Clock::pause();
-
-  master::Flags masterFlags = CreateMasterFlags();
-  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
-  ASSERT_SOME(master);
-
-  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
-    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
-
-  slave::Flags agentFlags = CreateSlaveFlags();
-  Owned<MasterDetector> detector = master.get()->createDetector();
-  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), agentFlags);
-  ASSERT_SOME(slave);
-
-  Clock::advance(agentFlags.registration_backoff_factor);
-
-  AWAIT_READY(slaveRegisteredMessage);
-
-  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
-
-  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
-  frameworkInfo.set_checkpoint(true);
-  frameworkInfo.add_capabilities()->set_type(
-      v1::FrameworkInfo::Capability::PARTITION_AWARE);
-
-  EXPECT_CALL(*scheduler, connected(_))
-    .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
-
-  Future<v1::scheduler::Event::Subscribed> subscribed;
-  EXPECT_CALL(*scheduler, subscribed(_, _))
-    .WillOnce(FutureArg<1>(&subscribed));
-
-  EXPECT_CALL(*scheduler, heartbeat(_))
-    .WillRepeatedly(Return()); // Ignore heartbeats.
-
-  Future<v1::scheduler::Event::Offers> offers;
-  EXPECT_CALL(*scheduler, offers(_, _))
-    .WillOnce(FutureArg<1>(&offers))
-    .WillRepeatedly(Return());
-
-  auto mesos = std::make_shared<v1::scheduler::TestMesos>(
-      master.get()->pid, ContentType::PROTOBUF, scheduler);
-
-  AWAIT_READY(subscribed);
-  v1::FrameworkID frameworkId(subscribed->framework_id());
-
-  AWAIT_READY(offers);
-  ASSERT_FALSE(offers->offers().empty());
-
-  const v1::Offer& offer = offers->offers(0);
-  const v1::AgentID& agentId = offer.agent_id();
-
-  Try<v1::Resources> resources =
-    v1::Resources::parse("cpus:0.1;mem:64;disk:64");
-
-  ASSERT_SOME(resources);
-
-  v1::TaskInfo taskInfo =
-    v1::createTask(agentId, resources.get(), SLEEP_COMMAND(1000));
-
-  testing::Sequence updateSequence;
-  Future<v1::scheduler::Event::Update> startingUpdate;
-  Future<v1::scheduler::Event::Update> runningUpdate;
-  Future<v1::scheduler::Event::Update> unreachableUpdate;
-  Future<v1::scheduler::Event::Update> runningUpdate2;
-  Future<v1::scheduler::Event::Update> killedUpdate;
-
-  // Make absolutely sure the agent receives these two acknowledgements
-  // before forcing the agent offline.
-  Future<StatusUpdateAcknowledgementMessage> startingAck =
-    FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _);
-  Future<StatusUpdateAcknowledgementMessage> runningAck =
-    FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _);
-
-  EXPECT_CALL(
-      *scheduler,
-      update(_, AllOf(
-          TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
-          TaskStatusUpdateStateEq(v1::TASK_STARTING))))
-    .InSequence(updateSequence)
-    .WillOnce(DoAll(
-        FutureArg<1>(&startingUpdate),
-        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
-
-  EXPECT_CALL(
-      *scheduler,
-      update(_, AllOf(
-            TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
-            TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
-    .InSequence(updateSequence)
-    .WillOnce(DoAll(
-        FutureArg<1>(&runningUpdate),
-        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
-
-  EXPECT_CALL(
-      *scheduler,
-      update(_, AllOf(
-            TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
-            TaskStatusUpdateStateEq(v1::TASK_UNREACHABLE))))
-    .InSequence(updateSequence)
-    .WillOnce(DoAll(
-        FutureArg<1>(&unreachableUpdate),
-        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
-
-  // When the agent is brought back, we expect a TASK_RUNNING followed by
-  // a TASK_KILLED (due to draining).
-  EXPECT_CALL(
-      *scheduler,
-      update(_, AllOf(
-            TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
-            TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
-    .InSequence(updateSequence)
-    .WillOnce(DoAll(
-        FutureArg<1>(&runningUpdate2),
-        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
-
-  EXPECT_CALL(
-      *scheduler,
-      update(_, AllOf(
-            TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
-            TaskStatusUpdateStateEq(v1::TASK_KILLED))))
-    .InSequence(updateSequence)
-    .WillOnce(DoAll(
-        FutureArg<1>(&killedUpdate),
-        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
-
-  mesos->send(
-      v1::createCallAccept(
-          frameworkId,
-          offer,
-          {v1::LAUNCH({taskInfo})}));
-
-  AWAIT_READY(startingUpdate);
-  AWAIT_READY(startingAck);
-  AWAIT_READY(runningUpdate);
-  AWAIT_READY(runningAck);
-
-  // Simulate an agent crash, so that it disconnects from the master.
-  slave.get()->terminate();
-  slave->reset();
-
-  Clock::advance(masterFlags.agent_reregister_timeout);
-  AWAIT_READY(unreachableUpdate);
-
-  // Start draining the unreachable agent.
-  ContentType contentType = GetParam();
-
-  {
-    v1::master::Call::DrainAgent drainAgent;
-    drainAgent.mutable_agent_id()->CopyFrom(agentId);
-
-    v1::master::Call call;
-    call.set_type(v1::master::Call::DRAIN_AGENT);
-    call.mutable_drain_agent()->CopyFrom(drainAgent);
-
-    post(master.get()->pid, call, contentType);
-  }
-
-  // Bring the agent back.
-  Future<ReregisterExecutorMessage> reregisterExecutor =
-    FUTURE_PROTOBUF(ReregisterExecutorMessage(), _, _);
-
-  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
-    FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
-
-  Future<DrainSlaveMessage> drainSlaveMesage =
-    FUTURE_PROTOBUF(DrainSlaveMessage(), _, _);
-
-  Try<Owned<cluster::Slave>> recoveredSlave =
-    StartSlave(detector.get(), agentFlags);
-  ASSERT_SOME(recoveredSlave);
-
-  AWAIT_READY(reregisterExecutor);
-  Clock::advance(agentFlags.executor_reregistration_timeout);
-  Clock::settle();
-  Clock::advance(agentFlags.registration_backoff_factor);
-  Clock::settle();
-  AWAIT_READY(slaveReregisteredMessage);
-
-  // The agent should be told to drain once it reregisters.
-  AWAIT_READY(drainSlaveMesage);
-  AWAIT_READY(runningUpdate2);
-  AWAIT_READY(killedUpdate);
-}
-
-
 class AgentAPITest
   : public MesosTest,
     public WithParamInterface<ContentType>
diff --git a/src/tests/master_draining_tests.cpp b/src/tests/master_draining_tests.cpp
new file mode 100644
index 0000000..16d0c85
--- /dev/null
+++ b/src/tests/master_draining_tests.cpp
@@ -0,0 +1,662 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <string>
+
+#include <mesos/http.hpp>
+
+#include <mesos/v1/mesos.hpp>
+#include <mesos/v1/resources.hpp>
+
+#include <mesos/v1/master/master.hpp>
+
+#include <mesos/v1/scheduler/scheduler.hpp>
+
+#include <process/clock.hpp>
+#include <process/future.hpp>
+#include <process/gmock.hpp>
+#include <process/gtest.hpp>
+#include <process/http.hpp>
+#include <process/owned.hpp>
+
+#include <stout/gtest.hpp>
+#include <stout/nothing.hpp>
+#include <stout/stringify.hpp>
+#include <stout/try.hpp>
+
+#include "common/http.hpp"
+#include "common/protobuf_utils.hpp"
+#include "common/resources_utils.hpp"
+
+#include "messages/messages.hpp"
+
+#include "tests/cluster.hpp"
+#include "tests/mesos.hpp"
+#include "tests/resources_utils.hpp"
+
+namespace http = process::http;
+
+
+using mesos::master::detector::MasterDetector;
+
+using process::Clock;
+using process::Failure;
+using process::Future;
+using process::Owned;
+
+using testing::_;
+using testing::AllOf;
+using testing::DoAll;
+using testing::Return;
+using testing::Sequence;
+using testing::WithParamInterface;
+
+namespace mesos {
+namespace internal {
+namespace tests {
+
+class MasterDrainingTest
+  : public MesosTest,
+    public WithParamInterface<ContentType>
+{
+public:
+  master::Flags CreateMasterFlags() override
+  {
+    // Turn off periodic allocations to avoid the race between
+    // `HierarchicalAllocator::updateAvailable()` and periodic allocations.
+    master::Flags flags = MesosTest::CreateMasterFlags();
+    flags.allocation_interval = Seconds(1000);
+    return flags;
+  }
+
+  // Helper function to post a request to "/api/v1" master endpoint and return
+  // the response.
+  Future<v1::master::Response> post(
+      const process::PID<master::Master>& pid,
+      const v1::master::Call& call,
+      const ContentType& contentType,
+      const Credential& credential = DEFAULT_CREDENTIAL)
+  {
+    http::Headers headers = createBasicAuthHeaders(credential);
+    headers["Accept"] = stringify(contentType);
+
+    return http::post(
+        pid,
+        "api/v1",
+        headers,
+        serialize(contentType, call),
+        stringify(contentType))
+      .then([contentType](const http::Response& response)
+            -> Future<v1::master::Response> {
+        if (response.status != http::OK().status) {
+          return Failure("Unexpected response status " + response.status);
+        }
+        return deserialize<v1::master::Response>(contentType, response.body);
+      });
+  }
+};
+
+
+// These tests are parameterized by the content type of the HTTP request.
+INSTANTIATE_TEST_CASE_P(
+    ContentType,
+    MasterDrainingTest,
+    ::testing::Values(ContentType::PROTOBUF, ContentType::JSON));
+
+
+// When an operator submits a DRAIN_AGENT call, the agent should kill all
+// running tasks.
+TEST_P(MasterDrainingTest, DrainAgent)
+{
+  Clock::pause();
+
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  slave::Flags agentFlags = CreateSlaveFlags();
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), agentFlags);
+  ASSERT_SOME(slave);
+
+  Clock::advance(agentFlags.registration_backoff_factor);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.add_capabilities()->set_type(
+      v1::FrameworkInfo::Capability::PARTITION_AWARE);
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());
+
+  auto mesos = std::make_shared<v1::scheduler::TestMesos>(
+      master.get()->pid, ContentType::PROTOBUF, scheduler);
+
+  AWAIT_READY(subscribed);
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+
+  const v1::Offer& offer = offers->offers(0);
+  const v1::AgentID& agentId = offer.agent_id();
+
+  Try<v1::Resources> resources =
+    v1::Resources::parse("cpus:0.1;mem:64;disk:64");
+
+  ASSERT_SOME(resources);
+
+  v1::TaskInfo taskInfo =
+    v1::createTask(agentId, resources.get(), SLEEP_COMMAND(1000));
+
+  testing::Sequence updateSequence;
+  Future<v1::scheduler::Event::Update> startingUpdate;
+  Future<v1::scheduler::Event::Update> runningUpdate;
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+          TaskStatusUpdateStateEq(v1::TASK_STARTING))))
+    .InSequence(updateSequence)
+    .WillOnce(DoAll(
+        FutureArg<1>(&startingUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+            TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+            TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
+    .InSequence(updateSequence)
+    .WillOnce(DoAll(
+        FutureArg<1>(&runningUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+    .WillRepeatedly(Return());
+
+  mesos->send(
+      v1::createCallAccept(
+          frameworkId,
+          offer,
+          {v1::LAUNCH({taskInfo})}));
+
+  AWAIT_READY(startingUpdate);
+  AWAIT_READY(runningUpdate);
+
+  Future<v1::scheduler::Event::Update> killedUpdate;
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+            TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+            TaskStatusUpdateStateEq(v1::TASK_KILLED))))
+    .WillOnce(DoAll(
+        FutureArg<1>(&killedUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  Future<Nothing> registrarApplyDrained;
+  EXPECT_CALL(*master.get()->registrar, apply(_))
+    .WillOnce(DoDefault())
+    .WillOnce(DoAll(
+        FutureSatisfy(&registrarApplyDrained),
+        Invoke(master.get()->registrar.get(), &MockRegistrar::unmocked_apply)));
+
+  ContentType contentType = GetParam();
+
+  {
+    v1::master::Call::DrainAgent drainAgent;
+    drainAgent.mutable_agent_id()->CopyFrom(agentId);
+    drainAgent.mutable_max_grace_period()->set_seconds(0);
+
+    v1::master::Call call;
+    call.set_type(v1::master::Call::DRAIN_AGENT);
+    call.mutable_drain_agent()->CopyFrom(drainAgent);
+
+    post(master.get()->pid, call, contentType);
+  }
+
+  AWAIT_READY(killedUpdate);
+  AWAIT_READY(registrarApplyDrained);
+
+  // Ensure that the update acknowledgement has been processed.
+  Clock::settle();
+
+  mesos::v1::DrainInfo drainInfo;
+  drainInfo.set_state(mesos::v1::DRAINED);
+  drainInfo.mutable_config()->set_mark_gone(false);
+  drainInfo.mutable_config()->mutable_max_grace_period()->set_nanoseconds(0);
+
+  // Ensure that the agent's drain info is reflected in the master's
+  // GET_AGENTS response.
+  {
+    v1::master::Call call;
+    call.set_type(v1::master::Call::GET_AGENTS);
+
+    Future<v1::master::Response> response =
+      post(master.get()->pid, call, contentType);
+
+    AWAIT_READY(response);
+    ASSERT_TRUE(response->IsInitialized());
+    ASSERT_EQ(v1::master::Response::GET_AGENTS, response->type());
+    ASSERT_EQ(response->get_agents().agents_size(), 1);
+
+    const v1::master::Response::GetAgents::Agent& agent =
+        response->get_agents().agents(0);
+
+    EXPECT_EQ(agent.deactivated(), true);
+
+    EXPECT_EQ(agent.drain_info(), drainInfo);
+    EXPECT_LT(0, agent.estimated_drain_start_time().nanoseconds());
+  }
+
+  // Ensure that the agent's drain info is reflected in the master's
+  // '/state' response.
+  {
+    Future<process::http::Response> response = process::http::get(
+        master.get()->pid,
+        "state",
+        None(),
+        createBasicAuthHeaders(DEFAULT_CREDENTIAL));
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(process::http::OK().status, response);
+    AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response);
+
+    Try<JSON::Object> parse = JSON::parse<JSON::Object>(response->body);
+    ASSERT_SOME(parse);
+
+    Result<JSON::Object> stateDrainInfo = parse->find<JSON::Object>(
+        "slaves[0].drain_info");
+
+    ASSERT_SOME_EQ(JSON::protobuf(drainInfo), stateDrainInfo);
+
+    Result<JSON::Number> stateDrainStartTime = parse->find<JSON::Number>(
+        "slaves[0].estimated_drain_start_time_seconds");
+
+    ASSERT_SOME(stateDrainStartTime);
+    EXPECT_LT(0, stateDrainStartTime->as<int>());
+  }
+
+  // Ensure that the agent's drain info is reflected in the master's
+  // '/state-summary' response.
+  {
+    Future<process::http::Response> response = process::http::get(
+        master.get()->pid,
+        "state-summary",
+        None(),
+        createBasicAuthHeaders(DEFAULT_CREDENTIAL));
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(process::http::OK().status, response);
+    AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response);
+
+    Try<JSON::Object> parse = JSON::parse<JSON::Object>(response->body);
+    ASSERT_SOME(parse);
+
+    Result<JSON::Object> stateDrainInfo = parse->find<JSON::Object>(
+        "slaves[0].drain_info");
+
+    ASSERT_SOME_EQ(JSON::protobuf(drainInfo), stateDrainInfo);
+
+    Result<JSON::Number> stateDrainStartTime =
+      parse->find<JSON::Number>("slaves[0].estimated_drain_start_time_seconds");
+
+    ASSERT_SOME(stateDrainStartTime);
+    EXPECT_LT(0, stateDrainStartTime->as<int>());
+  }
+}
+
+
+// When an operator submits a DRAIN_AGENT call with 'mark_gone == true', the
+// agent should kill all running tasks and the master should mark the agent gone
+// once terminal ACKs have been received.
+TEST_P(MasterDrainingTest, DrainAgentMarkGone)
+{
+  Clock::pause();
+
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  slave::Flags agentFlags = CreateSlaveFlags();
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), agentFlags);
+  ASSERT_SOME(slave);
+
+  Clock::advance(agentFlags.registration_backoff_factor);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.add_capabilities()->set_type(
+      v1::FrameworkInfo::Capability::PARTITION_AWARE);
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());
+
+  auto mesos = std::make_shared<v1::scheduler::TestMesos>(
+      master.get()->pid, ContentType::PROTOBUF, scheduler);
+
+  AWAIT_READY(subscribed);
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+
+  const v1::Offer& offer = offers->offers(0);
+  const v1::AgentID& agentId = offer.agent_id();
+
+  Try<v1::Resources> resources =
+    v1::Resources::parse("cpus:0.1;mem:64;disk:64");
+
+  ASSERT_SOME(resources);
+
+  v1::TaskInfo taskInfo =
+    v1::createTask(agentId, resources.get(), SLEEP_COMMAND(1000));
+
+  testing::Sequence updateSequence;
+  Future<v1::scheduler::Event::Update> startingUpdate;
+  Future<v1::scheduler::Event::Update> runningUpdate;
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+          TaskStatusUpdateStateEq(v1::TASK_STARTING))))
+    .InSequence(updateSequence)
+    .WillOnce(DoAll(
+        FutureArg<1>(&startingUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+            TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+            TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
+    .InSequence(updateSequence)
+    .WillOnce(DoAll(
+        FutureArg<1>(&runningUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+    .WillRepeatedly(Return());
+
+  mesos->send(
+      v1::createCallAccept(
+          frameworkId,
+          offer,
+          {v1::LAUNCH({taskInfo})}));
+
+  AWAIT_READY(startingUpdate);
+  AWAIT_READY(runningUpdate);
+
+  Future<v1::scheduler::Event::Update> goneUpdate;
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+            TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+            TaskStatusUpdateStateEq(v1::TASK_GONE_BY_OPERATOR))))
+    .WillOnce(DoAll(
+        FutureArg<1>(&goneUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  // When the terminal ACK is received by the master, the agent should be marked
+  // gone, which entails sending a `ShutdownMessage`.
+  Future<ShutdownMessage> shutdownMessage =
+    FUTURE_PROTOBUF(ShutdownMessage(), _, _);
+
+  ContentType contentType = GetParam();
+
+  {
+    v1::master::Call::DrainAgent drainAgent;
+    drainAgent.mutable_agent_id()->CopyFrom(agentId);
+    drainAgent.set_mark_gone(true);
+
+    v1::master::Call call;
+    call.set_type(v1::master::Call::DRAIN_AGENT);
+    call.mutable_drain_agent()->CopyFrom(drainAgent);
+
+    post(master.get()->pid, call, contentType);
+  }
+
+  AWAIT_READY(goneUpdate);
+  AWAIT_READY(shutdownMessage);
+}
+
+
+// When an operator submits a DRAIN_AGENT call with an agent that has gone
+// unreachable, the call should succeed, and the agent should be drained
+// if/when it returns to the cluster.
+TEST_P(MasterDrainingTest, DrainAgentUnreachable)
+{
+  Clock::pause();
+
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  slave::Flags agentFlags = CreateSlaveFlags();
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), agentFlags);
+  ASSERT_SOME(slave);
+
+  Clock::advance(agentFlags.registration_backoff_factor);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true);
+  frameworkInfo.add_capabilities()->set_type(
+      v1::FrameworkInfo::Capability::PARTITION_AWARE);
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());
+
+  auto mesos = std::make_shared<v1::scheduler::TestMesos>(
+      master.get()->pid, ContentType::PROTOBUF, scheduler);
+
+  AWAIT_READY(subscribed);
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+
+  const v1::Offer& offer = offers->offers(0);
+  const v1::AgentID& agentId = offer.agent_id();
+
+  Try<v1::Resources> resources =
+    v1::Resources::parse("cpus:0.1;mem:64;disk:64");
+
+  ASSERT_SOME(resources);
+
+  v1::TaskInfo taskInfo =
+    v1::createTask(agentId, resources.get(), SLEEP_COMMAND(1000));
+
+  testing::Sequence updateSequence;
+  Future<v1::scheduler::Event::Update> startingUpdate;
+  Future<v1::scheduler::Event::Update> runningUpdate;
+  Future<v1::scheduler::Event::Update> unreachableUpdate;
+  Future<v1::scheduler::Event::Update> runningUpdate2;
+  Future<v1::scheduler::Event::Update> killedUpdate;
+
+  // Make absolutely sure the agent receives these two acknowledgements
+  // before forcing the agent offline.
+  Future<StatusUpdateAcknowledgementMessage> startingAck =
+    FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _);
+  Future<StatusUpdateAcknowledgementMessage> runningAck =
+    FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _);
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+          TaskStatusUpdateStateEq(v1::TASK_STARTING))))
+    .InSequence(updateSequence)
+    .WillOnce(DoAll(
+        FutureArg<1>(&startingUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+            TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+            TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
+    .InSequence(updateSequence)
+    .WillOnce(DoAll(
+        FutureArg<1>(&runningUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+            TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+            TaskStatusUpdateStateEq(v1::TASK_UNREACHABLE))))
+    .InSequence(updateSequence)
+    .WillOnce(DoAll(
+        FutureArg<1>(&unreachableUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  // When the agent is brought back, we expect a TASK_RUNNING followed by
+  // a TASK_KILLED (due to draining).
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+            TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+            TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
+    .InSequence(updateSequence)
+    .WillOnce(DoAll(
+        FutureArg<1>(&runningUpdate2),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+            TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+            TaskStatusUpdateStateEq(v1::TASK_KILLED))))
+    .InSequence(updateSequence)
+    .WillOnce(DoAll(
+        FutureArg<1>(&killedUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  mesos->send(
+      v1::createCallAccept(
+          frameworkId,
+          offer,
+          {v1::LAUNCH({taskInfo})}));
+
+  AWAIT_READY(startingUpdate);
+  AWAIT_READY(startingAck);
+  AWAIT_READY(runningUpdate);
+  AWAIT_READY(runningAck);
+
+  // Simulate an agent crash, so that it disconnects from the master.
+  slave.get()->terminate();
+  slave->reset();
+
+  Clock::advance(masterFlags.agent_reregister_timeout);
+  AWAIT_READY(unreachableUpdate);
+
+  // Start draining the unreachable agent.
+  ContentType contentType = GetParam();
+
+  {
+    v1::master::Call::DrainAgent drainAgent;
+    drainAgent.mutable_agent_id()->CopyFrom(agentId);
+
+    v1::master::Call call;
+    call.set_type(v1::master::Call::DRAIN_AGENT);
+    call.mutable_drain_agent()->CopyFrom(drainAgent);
+
+    post(master.get()->pid, call, contentType);
+  }
+
+  // Bring the agent back.
+  Future<ReregisterExecutorMessage> reregisterExecutor =
+    FUTURE_PROTOBUF(ReregisterExecutorMessage(), _, _);
+
+  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+    FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+  Future<DrainSlaveMessage> drainSlaveMesage =
+    FUTURE_PROTOBUF(DrainSlaveMessage(), _, _);
+
+  Try<Owned<cluster::Slave>> recoveredSlave =
+    StartSlave(detector.get(), agentFlags);
+  ASSERT_SOME(recoveredSlave);
+
+  AWAIT_READY(reregisterExecutor);
+  Clock::advance(agentFlags.executor_reregistration_timeout);
+  Clock::settle();
+  Clock::advance(agentFlags.registration_backoff_factor);
+  Clock::settle();
+  AWAIT_READY(slaveReregisteredMessage);
+
+  // The agent should be told to drain once it reregisters.
+  AWAIT_READY(drainSlaveMesage);
+  AWAIT_READY(runningUpdate2);
+  AWAIT_READY(killedUpdate);
+}
+
+} // namespace tests {
+} // namespace internal {
+} // namespace mesos {