You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jo...@apache.org on 2019/08/26 23:09:57 UTC
[mesos] 01/05: Moved master-side agent draining tests into a
separate file.
This is an automated email from the ASF dual-hosted git repository.
josephwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 5124b290ddc368e2e7cc3d56173fb4b3137af620
Author: Joseph Wu <jo...@apache.org>
AuthorDate: Wed Jul 24 15:45:22 2019 -0700
Moved master-side agent draining tests into a separate file.
The test bodies were not changed, besides renaming the test class.
Review: https://reviews.apache.org/r/71314
---
src/Makefile.am | 3 +-
src/tests/CMakeLists.txt | 1 +
src/tests/api_tests.cpp | 541 -----------------------------
src/tests/master_draining_tests.cpp | 662 ++++++++++++++++++++++++++++++++++++
4 files changed, 665 insertions(+), 542 deletions(-)
diff --git a/src/Makefile.am b/src/Makefile.am
index a89cd61..577acfd 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -2608,7 +2608,8 @@ mesos_tests_SOURCES = \
tests/master_allocator_tests.cpp \
tests/master_authorization_tests.cpp \
tests/master_benchmarks.cpp \
- tests/master_contender_detector_tests.cpp \
+ tests/master_contender_detector_tests.cpp \
+ tests/master_draining_tests.cpp \
tests/master_load_tests.cpp \
tests/master_maintenance_tests.cpp \
tests/master_quota_tests.cpp \
diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt
index 04c552a..1e53b39 100644
--- a/src/tests/CMakeLists.txt
+++ b/src/tests/CMakeLists.txt
@@ -105,6 +105,7 @@ set(MESOS_TESTS_SRC
hook_tests.cpp
http_authentication_tests.cpp
http_fault_tolerance_tests.cpp
+ master_draining_tests.cpp
master_load_tests.cpp
master_maintenance_tests.cpp
master_slave_reconciliation_tests.cpp
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index a735a20..bd207ea 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -5470,547 +5470,6 @@ TEST_P(MasterAPITest, OperationUpdatesUponUnreachable)
}
-// When an operator submits a DRAIN_AGENT call, the agent should kill all
-// running tasks.
-TEST_P(MasterAPITest, DrainAgent)
-{
- Clock::pause();
-
- master::Flags masterFlags = CreateMasterFlags();
- Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
- ASSERT_SOME(master);
-
- Future<SlaveRegisteredMessage> slaveRegisteredMessage =
- FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
-
- slave::Flags agentFlags = CreateSlaveFlags();
- Owned<MasterDetector> detector = master.get()->createDetector();
- Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), agentFlags);
- ASSERT_SOME(slave);
-
- Clock::advance(agentFlags.registration_backoff_factor);
-
- AWAIT_READY(slaveRegisteredMessage);
-
- auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
-
- v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
- frameworkInfo.add_capabilities()->set_type(
- v1::FrameworkInfo::Capability::PARTITION_AWARE);
-
- EXPECT_CALL(*scheduler, connected(_))
- .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
-
- Future<v1::scheduler::Event::Subscribed> subscribed;
- EXPECT_CALL(*scheduler, subscribed(_, _))
- .WillOnce(FutureArg<1>(&subscribed));
-
- EXPECT_CALL(*scheduler, heartbeat(_))
- .WillRepeatedly(Return()); // Ignore heartbeats.
-
- Future<v1::scheduler::Event::Offers> offers;
- EXPECT_CALL(*scheduler, offers(_, _))
- .WillOnce(FutureArg<1>(&offers))
- .WillRepeatedly(Return());
-
- auto mesos = std::make_shared<v1::scheduler::TestMesos>(
- master.get()->pid, ContentType::PROTOBUF, scheduler);
-
- AWAIT_READY(subscribed);
- v1::FrameworkID frameworkId(subscribed->framework_id());
-
- AWAIT_READY(offers);
- ASSERT_FALSE(offers->offers().empty());
-
- const v1::Offer& offer = offers->offers(0);
- const v1::AgentID& agentId = offer.agent_id();
-
- Try<v1::Resources> resources =
- v1::Resources::parse("cpus:0.1;mem:64;disk:64");
-
- ASSERT_SOME(resources);
-
- v1::TaskInfo taskInfo =
- v1::createTask(agentId, resources.get(), SLEEP_COMMAND(1000));
-
- testing::Sequence updateSequence;
- Future<v1::scheduler::Event::Update> startingUpdate;
- Future<v1::scheduler::Event::Update> runningUpdate;
-
- EXPECT_CALL(
- *scheduler,
- update(_, AllOf(
- TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
- TaskStatusUpdateStateEq(v1::TASK_STARTING))))
- .InSequence(updateSequence)
- .WillOnce(DoAll(
- FutureArg<1>(&startingUpdate),
- v1::scheduler::SendAcknowledge(frameworkId, agentId)));
-
- EXPECT_CALL(
- *scheduler,
- update(_, AllOf(
- TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
- TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
- .InSequence(updateSequence)
- .WillOnce(DoAll(
- FutureArg<1>(&runningUpdate),
- v1::scheduler::SendAcknowledge(frameworkId, agentId)))
- .WillRepeatedly(Return());
-
- mesos->send(
- v1::createCallAccept(
- frameworkId,
- offer,
- {v1::LAUNCH({taskInfo})}));
-
- AWAIT_READY(startingUpdate);
- AWAIT_READY(runningUpdate);
-
- Future<v1::scheduler::Event::Update> killedUpdate;
- EXPECT_CALL(
- *scheduler,
- update(_, AllOf(
- TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
- TaskStatusUpdateStateEq(v1::TASK_KILLED))))
- .WillOnce(DoAll(
- FutureArg<1>(&killedUpdate),
- v1::scheduler::SendAcknowledge(frameworkId, agentId)));
-
- Future<Nothing> registrarApplyDrained;
- EXPECT_CALL(*master.get()->registrar, apply(_))
- .WillOnce(DoDefault())
- .WillOnce(DoAll(
- FutureSatisfy(®istrarApplyDrained),
- Invoke(master.get()->registrar.get(), &MockRegistrar::unmocked_apply)));
-
- ContentType contentType = GetParam();
-
- {
- v1::master::Call::DrainAgent drainAgent;
- drainAgent.mutable_agent_id()->CopyFrom(agentId);
- drainAgent.mutable_max_grace_period()->set_seconds(0);
-
- v1::master::Call call;
- call.set_type(v1::master::Call::DRAIN_AGENT);
- call.mutable_drain_agent()->CopyFrom(drainAgent);
-
- post(master.get()->pid, call, contentType);
- }
-
- AWAIT_READY(killedUpdate);
- AWAIT_READY(registrarApplyDrained);
-
- // Ensure that the update acknowledgement has been processed.
- Clock::settle();
-
- mesos::v1::DrainInfo drainInfo;
- drainInfo.set_state(mesos::v1::DRAINED);
- drainInfo.mutable_config()->set_mark_gone(false);
- drainInfo.mutable_config()->mutable_max_grace_period()->set_nanoseconds(0);
-
- // Ensure that the agent's drain info is reflected in the master's
- // GET_AGENTS response.
- {
- v1::master::Call call;
- call.set_type(v1::master::Call::GET_AGENTS);
-
- Future<v1::master::Response> response =
- post(master.get()->pid, call, contentType);
-
- AWAIT_READY(response);
- ASSERT_TRUE(response->IsInitialized());
- ASSERT_EQ(v1::master::Response::GET_AGENTS, response->type());
- ASSERT_EQ(response->get_agents().agents_size(), 1);
-
- const v1::master::Response::GetAgents::Agent& agent =
- response->get_agents().agents(0);
-
- EXPECT_EQ(agent.deactivated(), true);
-
- EXPECT_EQ(agent.drain_info(), drainInfo);
- EXPECT_LT(0, agent.estimated_drain_start_time().nanoseconds());
- }
-
- // Ensure that the agent's drain info is reflected in the master's
- // '/state' response.
- {
- Future<process::http::Response> response = process::http::get(
- master.get()->pid,
- "state",
- None(),
- createBasicAuthHeaders(DEFAULT_CREDENTIAL));
-
- AWAIT_EXPECT_RESPONSE_STATUS_EQ(process::http::OK().status, response);
- AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response);
-
- Try<JSON::Object> parse = JSON::parse<JSON::Object>(response->body);
- ASSERT_SOME(parse);
-
- Result<JSON::Object> stateDrainInfo = parse->find<JSON::Object>(
- "slaves[0].drain_info");
-
- ASSERT_SOME_EQ(JSON::protobuf(drainInfo), stateDrainInfo);
-
- Result<JSON::Number> stateDrainStartTime = parse->find<JSON::Number>(
- "slaves[0].estimated_drain_start_time_seconds");
-
- ASSERT_SOME(stateDrainStartTime);
- EXPECT_LT(0, stateDrainStartTime->as<int>());
- }
-
- // Ensure that the agent's drain info is reflected in the master's
- // '/state-summary' response.
- {
- Future<process::http::Response> response = process::http::get(
- master.get()->pid,
- "state-summary",
- None(),
- createBasicAuthHeaders(DEFAULT_CREDENTIAL));
-
- AWAIT_EXPECT_RESPONSE_STATUS_EQ(process::http::OK().status, response);
- AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response);
-
- Try<JSON::Object> parse = JSON::parse<JSON::Object>(response->body);
- ASSERT_SOME(parse);
-
- Result<JSON::Object> stateDrainInfo = parse->find<JSON::Object>(
- "slaves[0].drain_info");
-
- ASSERT_SOME_EQ(JSON::protobuf(drainInfo), stateDrainInfo);
-
- Result<JSON::Number> stateDrainStartTime =
- parse->find<JSON::Number>("slaves[0].estimated_drain_start_time_seconds");
-
- ASSERT_SOME(stateDrainStartTime);
- EXPECT_LT(0, stateDrainStartTime->as<int>());
- }
-}
-
-
-// When an operator submits a DRAIN_AGENT call with 'mark_gone == true', the
-// agent should kill all running tasks and the master should mark the agent gone
-// once terminal ACKs have been received.
-TEST_P(MasterAPITest, DrainAgentMarkGone)
-{
- Clock::pause();
-
- master::Flags masterFlags = CreateMasterFlags();
- Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
- ASSERT_SOME(master);
-
- Future<SlaveRegisteredMessage> slaveRegisteredMessage =
- FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
-
- slave::Flags agentFlags = CreateSlaveFlags();
- Owned<MasterDetector> detector = master.get()->createDetector();
- Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), agentFlags);
- ASSERT_SOME(slave);
-
- Clock::advance(agentFlags.registration_backoff_factor);
-
- AWAIT_READY(slaveRegisteredMessage);
-
- auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
-
- v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
- frameworkInfo.add_capabilities()->set_type(
- v1::FrameworkInfo::Capability::PARTITION_AWARE);
-
- EXPECT_CALL(*scheduler, connected(_))
- .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
-
- Future<v1::scheduler::Event::Subscribed> subscribed;
- EXPECT_CALL(*scheduler, subscribed(_, _))
- .WillOnce(FutureArg<1>(&subscribed));
-
- EXPECT_CALL(*scheduler, heartbeat(_))
- .WillRepeatedly(Return()); // Ignore heartbeats.
-
- Future<v1::scheduler::Event::Offers> offers;
- EXPECT_CALL(*scheduler, offers(_, _))
- .WillOnce(FutureArg<1>(&offers))
- .WillRepeatedly(Return());
-
- auto mesos = std::make_shared<v1::scheduler::TestMesos>(
- master.get()->pid, ContentType::PROTOBUF, scheduler);
-
- AWAIT_READY(subscribed);
- v1::FrameworkID frameworkId(subscribed->framework_id());
-
- AWAIT_READY(offers);
- ASSERT_FALSE(offers->offers().empty());
-
- const v1::Offer& offer = offers->offers(0);
- const v1::AgentID& agentId = offer.agent_id();
-
- Try<v1::Resources> resources =
- v1::Resources::parse("cpus:0.1;mem:64;disk:64");
-
- ASSERT_SOME(resources);
-
- v1::TaskInfo taskInfo =
- v1::createTask(agentId, resources.get(), SLEEP_COMMAND(1000));
-
- testing::Sequence updateSequence;
- Future<v1::scheduler::Event::Update> startingUpdate;
- Future<v1::scheduler::Event::Update> runningUpdate;
-
- EXPECT_CALL(
- *scheduler,
- update(_, AllOf(
- TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
- TaskStatusUpdateStateEq(v1::TASK_STARTING))))
- .InSequence(updateSequence)
- .WillOnce(DoAll(
- FutureArg<1>(&startingUpdate),
- v1::scheduler::SendAcknowledge(frameworkId, agentId)));
-
- EXPECT_CALL(
- *scheduler,
- update(_, AllOf(
- TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
- TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
- .InSequence(updateSequence)
- .WillOnce(DoAll(
- FutureArg<1>(&runningUpdate),
- v1::scheduler::SendAcknowledge(frameworkId, agentId)))
- .WillRepeatedly(Return());
-
- mesos->send(
- v1::createCallAccept(
- frameworkId,
- offer,
- {v1::LAUNCH({taskInfo})}));
-
- AWAIT_READY(startingUpdate);
- AWAIT_READY(runningUpdate);
-
- Future<v1::scheduler::Event::Update> goneUpdate;
- EXPECT_CALL(
- *scheduler,
- update(_, AllOf(
- TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
- TaskStatusUpdateStateEq(v1::TASK_GONE_BY_OPERATOR))))
- .WillOnce(DoAll(
- FutureArg<1>(&goneUpdate),
- v1::scheduler::SendAcknowledge(frameworkId, agentId)));
-
- // When the terminal ACK is received by the master, the agent should be marked
- // gone, which entails sending a `ShutdownMessage`.
- Future<ShutdownMessage> shutdownMessage =
- FUTURE_PROTOBUF(ShutdownMessage(), _, _);
-
- ContentType contentType = GetParam();
-
- {
- v1::master::Call::DrainAgent drainAgent;
- drainAgent.mutable_agent_id()->CopyFrom(agentId);
- drainAgent.set_mark_gone(true);
-
- v1::master::Call call;
- call.set_type(v1::master::Call::DRAIN_AGENT);
- call.mutable_drain_agent()->CopyFrom(drainAgent);
-
- post(master.get()->pid, call, contentType);
- }
-
- AWAIT_READY(goneUpdate);
- AWAIT_READY(shutdownMessage);
-}
-
-
-// When an operator submits a DRAIN_AGENT call with an agent that has gone
-// unreachable, the call should succeed, and the agent should be drained
-// if/when it returns to the cluster.
-TEST_P(MasterAPITest, DrainAgentUnreachable)
-{
- Clock::pause();
-
- master::Flags masterFlags = CreateMasterFlags();
- Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
- ASSERT_SOME(master);
-
- Future<SlaveRegisteredMessage> slaveRegisteredMessage =
- FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
-
- slave::Flags agentFlags = CreateSlaveFlags();
- Owned<MasterDetector> detector = master.get()->createDetector();
- Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), agentFlags);
- ASSERT_SOME(slave);
-
- Clock::advance(agentFlags.registration_backoff_factor);
-
- AWAIT_READY(slaveRegisteredMessage);
-
- auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
-
- v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
- frameworkInfo.set_checkpoint(true);
- frameworkInfo.add_capabilities()->set_type(
- v1::FrameworkInfo::Capability::PARTITION_AWARE);
-
- EXPECT_CALL(*scheduler, connected(_))
- .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
-
- Future<v1::scheduler::Event::Subscribed> subscribed;
- EXPECT_CALL(*scheduler, subscribed(_, _))
- .WillOnce(FutureArg<1>(&subscribed));
-
- EXPECT_CALL(*scheduler, heartbeat(_))
- .WillRepeatedly(Return()); // Ignore heartbeats.
-
- Future<v1::scheduler::Event::Offers> offers;
- EXPECT_CALL(*scheduler, offers(_, _))
- .WillOnce(FutureArg<1>(&offers))
- .WillRepeatedly(Return());
-
- auto mesos = std::make_shared<v1::scheduler::TestMesos>(
- master.get()->pid, ContentType::PROTOBUF, scheduler);
-
- AWAIT_READY(subscribed);
- v1::FrameworkID frameworkId(subscribed->framework_id());
-
- AWAIT_READY(offers);
- ASSERT_FALSE(offers->offers().empty());
-
- const v1::Offer& offer = offers->offers(0);
- const v1::AgentID& agentId = offer.agent_id();
-
- Try<v1::Resources> resources =
- v1::Resources::parse("cpus:0.1;mem:64;disk:64");
-
- ASSERT_SOME(resources);
-
- v1::TaskInfo taskInfo =
- v1::createTask(agentId, resources.get(), SLEEP_COMMAND(1000));
-
- testing::Sequence updateSequence;
- Future<v1::scheduler::Event::Update> startingUpdate;
- Future<v1::scheduler::Event::Update> runningUpdate;
- Future<v1::scheduler::Event::Update> unreachableUpdate;
- Future<v1::scheduler::Event::Update> runningUpdate2;
- Future<v1::scheduler::Event::Update> killedUpdate;
-
- // Make absolutely sure the agent receives these two acknowledgements
- // before forcing the agent offline.
- Future<StatusUpdateAcknowledgementMessage> startingAck =
- FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _);
- Future<StatusUpdateAcknowledgementMessage> runningAck =
- FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _);
-
- EXPECT_CALL(
- *scheduler,
- update(_, AllOf(
- TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
- TaskStatusUpdateStateEq(v1::TASK_STARTING))))
- .InSequence(updateSequence)
- .WillOnce(DoAll(
- FutureArg<1>(&startingUpdate),
- v1::scheduler::SendAcknowledge(frameworkId, agentId)));
-
- EXPECT_CALL(
- *scheduler,
- update(_, AllOf(
- TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
- TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
- .InSequence(updateSequence)
- .WillOnce(DoAll(
- FutureArg<1>(&runningUpdate),
- v1::scheduler::SendAcknowledge(frameworkId, agentId)));
-
- EXPECT_CALL(
- *scheduler,
- update(_, AllOf(
- TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
- TaskStatusUpdateStateEq(v1::TASK_UNREACHABLE))))
- .InSequence(updateSequence)
- .WillOnce(DoAll(
- FutureArg<1>(&unreachableUpdate),
- v1::scheduler::SendAcknowledge(frameworkId, agentId)));
-
- // When the agent is brought back, we expect a TASK_RUNNING followed by
- // a TASK_KILLED (due to draining).
- EXPECT_CALL(
- *scheduler,
- update(_, AllOf(
- TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
- TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
- .InSequence(updateSequence)
- .WillOnce(DoAll(
- FutureArg<1>(&runningUpdate2),
- v1::scheduler::SendAcknowledge(frameworkId, agentId)));
-
- EXPECT_CALL(
- *scheduler,
- update(_, AllOf(
- TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
- TaskStatusUpdateStateEq(v1::TASK_KILLED))))
- .InSequence(updateSequence)
- .WillOnce(DoAll(
- FutureArg<1>(&killedUpdate),
- v1::scheduler::SendAcknowledge(frameworkId, agentId)));
-
- mesos->send(
- v1::createCallAccept(
- frameworkId,
- offer,
- {v1::LAUNCH({taskInfo})}));
-
- AWAIT_READY(startingUpdate);
- AWAIT_READY(startingAck);
- AWAIT_READY(runningUpdate);
- AWAIT_READY(runningAck);
-
- // Simulate an agent crash, so that it disconnects from the master.
- slave.get()->terminate();
- slave->reset();
-
- Clock::advance(masterFlags.agent_reregister_timeout);
- AWAIT_READY(unreachableUpdate);
-
- // Start draining the unreachable agent.
- ContentType contentType = GetParam();
-
- {
- v1::master::Call::DrainAgent drainAgent;
- drainAgent.mutable_agent_id()->CopyFrom(agentId);
-
- v1::master::Call call;
- call.set_type(v1::master::Call::DRAIN_AGENT);
- call.mutable_drain_agent()->CopyFrom(drainAgent);
-
- post(master.get()->pid, call, contentType);
- }
-
- // Bring the agent back.
- Future<ReregisterExecutorMessage> reregisterExecutor =
- FUTURE_PROTOBUF(ReregisterExecutorMessage(), _, _);
-
- Future<SlaveReregisteredMessage> slaveReregisteredMessage =
- FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
-
- Future<DrainSlaveMessage> drainSlaveMesage =
- FUTURE_PROTOBUF(DrainSlaveMessage(), _, _);
-
- Try<Owned<cluster::Slave>> recoveredSlave =
- StartSlave(detector.get(), agentFlags);
- ASSERT_SOME(recoveredSlave);
-
- AWAIT_READY(reregisterExecutor);
- Clock::advance(agentFlags.executor_reregistration_timeout);
- Clock::settle();
- Clock::advance(agentFlags.registration_backoff_factor);
- Clock::settle();
- AWAIT_READY(slaveReregisteredMessage);
-
- // The agent should be told to drain once it reregisters.
- AWAIT_READY(drainSlaveMesage);
- AWAIT_READY(runningUpdate2);
- AWAIT_READY(killedUpdate);
-}
-
-
class AgentAPITest
: public MesosTest,
public WithParamInterface<ContentType>
diff --git a/src/tests/master_draining_tests.cpp b/src/tests/master_draining_tests.cpp
new file mode 100644
index 0000000..16d0c85
--- /dev/null
+++ b/src/tests/master_draining_tests.cpp
@@ -0,0 +1,662 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <string>
+
+#include <mesos/http.hpp>
+
+#include <mesos/v1/mesos.hpp>
+#include <mesos/v1/resources.hpp>
+
+#include <mesos/v1/master/master.hpp>
+
+#include <mesos/v1/scheduler/scheduler.hpp>
+
+#include <process/clock.hpp>
+#include <process/future.hpp>
+#include <process/gmock.hpp>
+#include <process/gtest.hpp>
+#include <process/http.hpp>
+#include <process/owned.hpp>
+
+#include <stout/gtest.hpp>
+#include <stout/nothing.hpp>
+#include <stout/stringify.hpp>
+#include <stout/try.hpp>
+
+#include "common/http.hpp"
+#include "common/protobuf_utils.hpp"
+#include "common/resources_utils.hpp"
+
+#include "messages/messages.hpp"
+
+#include "tests/cluster.hpp"
+#include "tests/mesos.hpp"
+#include "tests/resources_utils.hpp"
+
+namespace http = process::http;
+
+
+using mesos::master::detector::MasterDetector;
+
+using process::Clock;
+using process::Failure;
+using process::Future;
+using process::Owned;
+
+using testing::_;
+using testing::AllOf;
+using testing::DoAll;
+using testing::Return;
+using testing::Sequence;
+using testing::WithParamInterface;
+
+namespace mesos {
+namespace internal {
+namespace tests {
+
+class MasterDrainingTest
+ : public MesosTest,
+ public WithParamInterface<ContentType>
+{
+public:
+ master::Flags CreateMasterFlags() override
+ {
+ // Turn off periodic allocations to avoid the race between
+ // `HierarchicalAllocator::updateAvailable()` and periodic allocations.
+ master::Flags flags = MesosTest::CreateMasterFlags();
+ flags.allocation_interval = Seconds(1000);
+ return flags;
+ }
+
+ // Helper function to post a request to "/api/v1" master endpoint and return
+ // the response.
+ Future<v1::master::Response> post(
+ const process::PID<master::Master>& pid,
+ const v1::master::Call& call,
+ const ContentType& contentType,
+ const Credential& credential = DEFAULT_CREDENTIAL)
+ {
+ http::Headers headers = createBasicAuthHeaders(credential);
+ headers["Accept"] = stringify(contentType);
+
+ return http::post(
+ pid,
+ "api/v1",
+ headers,
+ serialize(contentType, call),
+ stringify(contentType))
+ .then([contentType](const http::Response& response)
+ -> Future<v1::master::Response> {
+ if (response.status != http::OK().status) {
+ return Failure("Unexpected response status " + response.status);
+ }
+ return deserialize<v1::master::Response>(contentType, response.body);
+ });
+ }
+};
+
+
+// These tests are parameterized by the content type of the HTTP request.
+INSTANTIATE_TEST_CASE_P(
+ ContentType,
+ MasterDrainingTest,
+ ::testing::Values(ContentType::PROTOBUF, ContentType::JSON));
+
+
+// When an operator submits a DRAIN_AGENT call, the agent should kill all
+// running tasks.
+TEST_P(MasterDrainingTest, DrainAgent)
+{
+ Clock::pause();
+
+ master::Flags masterFlags = CreateMasterFlags();
+ Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+ ASSERT_SOME(master);
+
+ Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+ FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+ slave::Flags agentFlags = CreateSlaveFlags();
+ Owned<MasterDetector> detector = master.get()->createDetector();
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), agentFlags);
+ ASSERT_SOME(slave);
+
+ Clock::advance(agentFlags.registration_backoff_factor);
+
+ AWAIT_READY(slaveRegisteredMessage);
+
+ auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+ v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.add_capabilities()->set_type(
+ v1::FrameworkInfo::Capability::PARTITION_AWARE);
+
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
+
+ Future<v1::scheduler::Event::Subscribed> subscribed;
+ EXPECT_CALL(*scheduler, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ EXPECT_CALL(*scheduler, heartbeat(_))
+ .WillRepeatedly(Return()); // Ignore heartbeats.
+
+ Future<v1::scheduler::Event::Offers> offers;
+ EXPECT_CALL(*scheduler, offers(_, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return());
+
+ auto mesos = std::make_shared<v1::scheduler::TestMesos>(
+ master.get()->pid, ContentType::PROTOBUF, scheduler);
+
+ AWAIT_READY(subscribed);
+ v1::FrameworkID frameworkId(subscribed->framework_id());
+
+ AWAIT_READY(offers);
+ ASSERT_FALSE(offers->offers().empty());
+
+ const v1::Offer& offer = offers->offers(0);
+ const v1::AgentID& agentId = offer.agent_id();
+
+ Try<v1::Resources> resources =
+ v1::Resources::parse("cpus:0.1;mem:64;disk:64");
+
+ ASSERT_SOME(resources);
+
+ v1::TaskInfo taskInfo =
+ v1::createTask(agentId, resources.get(), SLEEP_COMMAND(1000));
+
+ testing::Sequence updateSequence;
+ Future<v1::scheduler::Event::Update> startingUpdate;
+ Future<v1::scheduler::Event::Update> runningUpdate;
+
+ EXPECT_CALL(
+ *scheduler,
+ update(_, AllOf(
+ TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+ TaskStatusUpdateStateEq(v1::TASK_STARTING))))
+ .InSequence(updateSequence)
+ .WillOnce(DoAll(
+ FutureArg<1>(&startingUpdate),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+ EXPECT_CALL(
+ *scheduler,
+ update(_, AllOf(
+ TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+ TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
+ .InSequence(updateSequence)
+ .WillOnce(DoAll(
+ FutureArg<1>(&runningUpdate),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+ .WillRepeatedly(Return());
+
+ mesos->send(
+ v1::createCallAccept(
+ frameworkId,
+ offer,
+ {v1::LAUNCH({taskInfo})}));
+
+ AWAIT_READY(startingUpdate);
+ AWAIT_READY(runningUpdate);
+
+ Future<v1::scheduler::Event::Update> killedUpdate;
+ EXPECT_CALL(
+ *scheduler,
+ update(_, AllOf(
+ TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+ TaskStatusUpdateStateEq(v1::TASK_KILLED))))
+ .WillOnce(DoAll(
+ FutureArg<1>(&killedUpdate),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+ Future<Nothing> registrarApplyDrained;
+ EXPECT_CALL(*master.get()->registrar, apply(_))
+ .WillOnce(DoDefault())
+ .WillOnce(DoAll(
+ FutureSatisfy(®istrarApplyDrained),
+ Invoke(master.get()->registrar.get(), &MockRegistrar::unmocked_apply)));
+
+ ContentType contentType = GetParam();
+
+ {
+ v1::master::Call::DrainAgent drainAgent;
+ drainAgent.mutable_agent_id()->CopyFrom(agentId);
+ drainAgent.mutable_max_grace_period()->set_seconds(0);
+
+ v1::master::Call call;
+ call.set_type(v1::master::Call::DRAIN_AGENT);
+ call.mutable_drain_agent()->CopyFrom(drainAgent);
+
+ post(master.get()->pid, call, contentType);
+ }
+
+ AWAIT_READY(killedUpdate);
+ AWAIT_READY(registrarApplyDrained);
+
+ // Ensure that the update acknowledgement has been processed.
+ Clock::settle();
+
+ mesos::v1::DrainInfo drainInfo;
+ drainInfo.set_state(mesos::v1::DRAINED);
+ drainInfo.mutable_config()->set_mark_gone(false);
+ drainInfo.mutable_config()->mutable_max_grace_period()->set_nanoseconds(0);
+
+ // Ensure that the agent's drain info is reflected in the master's
+ // GET_AGENTS response.
+ {
+ v1::master::Call call;
+ call.set_type(v1::master::Call::GET_AGENTS);
+
+ Future<v1::master::Response> response =
+ post(master.get()->pid, call, contentType);
+
+ AWAIT_READY(response);
+ ASSERT_TRUE(response->IsInitialized());
+ ASSERT_EQ(v1::master::Response::GET_AGENTS, response->type());
+ ASSERT_EQ(response->get_agents().agents_size(), 1);
+
+ const v1::master::Response::GetAgents::Agent& agent =
+ response->get_agents().agents(0);
+
+ EXPECT_EQ(agent.deactivated(), true);
+
+ EXPECT_EQ(agent.drain_info(), drainInfo);
+ EXPECT_LT(0, agent.estimated_drain_start_time().nanoseconds());
+ }
+
+ // Ensure that the agent's drain info is reflected in the master's
+ // '/state' response.
+ {
+ Future<process::http::Response> response = process::http::get(
+ master.get()->pid,
+ "state",
+ None(),
+ createBasicAuthHeaders(DEFAULT_CREDENTIAL));
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(process::http::OK().status, response);
+ AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response);
+
+ Try<JSON::Object> parse = JSON::parse<JSON::Object>(response->body);
+ ASSERT_SOME(parse);
+
+ Result<JSON::Object> stateDrainInfo = parse->find<JSON::Object>(
+ "slaves[0].drain_info");
+
+ ASSERT_SOME_EQ(JSON::protobuf(drainInfo), stateDrainInfo);
+
+ Result<JSON::Number> stateDrainStartTime = parse->find<JSON::Number>(
+ "slaves[0].estimated_drain_start_time_seconds");
+
+ ASSERT_SOME(stateDrainStartTime);
+ EXPECT_LT(0, stateDrainStartTime->as<int>());
+ }
+
+ // Ensure that the agent's drain info is reflected in the master's
+ // '/state-summary' response.
+ {
+ Future<process::http::Response> response = process::http::get(
+ master.get()->pid,
+ "state-summary",
+ None(),
+ createBasicAuthHeaders(DEFAULT_CREDENTIAL));
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(process::http::OK().status, response);
+ AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response);
+
+ Try<JSON::Object> parse = JSON::parse<JSON::Object>(response->body);
+ ASSERT_SOME(parse);
+
+ Result<JSON::Object> stateDrainInfo = parse->find<JSON::Object>(
+ "slaves[0].drain_info");
+
+ ASSERT_SOME_EQ(JSON::protobuf(drainInfo), stateDrainInfo);
+
+ Result<JSON::Number> stateDrainStartTime =
+ parse->find<JSON::Number>("slaves[0].estimated_drain_start_time_seconds");
+
+ ASSERT_SOME(stateDrainStartTime);
+ EXPECT_LT(0, stateDrainStartTime->as<int>());
+ }
+}
+
+
+// When an operator submits a DRAIN_AGENT call with 'mark_gone == true', the
+// agent should kill all running tasks and the master should mark the agent gone
+// once terminal ACKs have been received.
+TEST_P(MasterDrainingTest, DrainAgentMarkGone)
+{
+ Clock::pause();
+
+ master::Flags masterFlags = CreateMasterFlags();
+ Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+ ASSERT_SOME(master);
+
+ Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+ FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+ slave::Flags agentFlags = CreateSlaveFlags();
+ Owned<MasterDetector> detector = master.get()->createDetector();
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), agentFlags);
+ ASSERT_SOME(slave);
+
+ Clock::advance(agentFlags.registration_backoff_factor);
+
+ AWAIT_READY(slaveRegisteredMessage);
+
+ auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+ v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.add_capabilities()->set_type(
+ v1::FrameworkInfo::Capability::PARTITION_AWARE);
+
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
+
+ Future<v1::scheduler::Event::Subscribed> subscribed;
+ EXPECT_CALL(*scheduler, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ EXPECT_CALL(*scheduler, heartbeat(_))
+ .WillRepeatedly(Return()); // Ignore heartbeats.
+
+ Future<v1::scheduler::Event::Offers> offers;
+ EXPECT_CALL(*scheduler, offers(_, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return());
+
+ auto mesos = std::make_shared<v1::scheduler::TestMesos>(
+ master.get()->pid, ContentType::PROTOBUF, scheduler);
+
+ AWAIT_READY(subscribed);
+ v1::FrameworkID frameworkId(subscribed->framework_id());
+
+ AWAIT_READY(offers);
+ ASSERT_FALSE(offers->offers().empty());
+
+ const v1::Offer& offer = offers->offers(0);
+ const v1::AgentID& agentId = offer.agent_id();
+
+ Try<v1::Resources> resources =
+ v1::Resources::parse("cpus:0.1;mem:64;disk:64");
+
+ ASSERT_SOME(resources);
+
+ v1::TaskInfo taskInfo =
+ v1::createTask(agentId, resources.get(), SLEEP_COMMAND(1000));
+
+ testing::Sequence updateSequence;
+ Future<v1::scheduler::Event::Update> startingUpdate;
+ Future<v1::scheduler::Event::Update> runningUpdate;
+
+ EXPECT_CALL(
+ *scheduler,
+ update(_, AllOf(
+ TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+ TaskStatusUpdateStateEq(v1::TASK_STARTING))))
+ .InSequence(updateSequence)
+ .WillOnce(DoAll(
+ FutureArg<1>(&startingUpdate),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+ EXPECT_CALL(
+ *scheduler,
+ update(_, AllOf(
+ TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+ TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
+ .InSequence(updateSequence)
+ .WillOnce(DoAll(
+ FutureArg<1>(&runningUpdate),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+ .WillRepeatedly(Return());
+
+ mesos->send(
+ v1::createCallAccept(
+ frameworkId,
+ offer,
+ {v1::LAUNCH({taskInfo})}));
+
+ AWAIT_READY(startingUpdate);
+ AWAIT_READY(runningUpdate);
+
+ Future<v1::scheduler::Event::Update> goneUpdate;
+ EXPECT_CALL(
+ *scheduler,
+ update(_, AllOf(
+ TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+ TaskStatusUpdateStateEq(v1::TASK_GONE_BY_OPERATOR))))
+ .WillOnce(DoAll(
+ FutureArg<1>(&goneUpdate),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+ // When the terminal ACK is received by the master, the agent should be marked
+ // gone, which entails sending a `ShutdownMessage`.
+ Future<ShutdownMessage> shutdownMessage =
+ FUTURE_PROTOBUF(ShutdownMessage(), _, _);
+
+ ContentType contentType = GetParam();
+
+ {
+ v1::master::Call::DrainAgent drainAgent;
+ drainAgent.mutable_agent_id()->CopyFrom(agentId);
+ drainAgent.set_mark_gone(true);
+
+ v1::master::Call call;
+ call.set_type(v1::master::Call::DRAIN_AGENT);
+ call.mutable_drain_agent()->CopyFrom(drainAgent);
+
+ post(master.get()->pid, call, contentType);
+ }
+
+ AWAIT_READY(goneUpdate);
+ AWAIT_READY(shutdownMessage);
+}
+
+
+// When an operator submits a DRAIN_AGENT call with an agent that has gone
+// unreachable, the call should succeed, and the agent should be drained
+// if/when it returns to the cluster.
+TEST_P(MasterDrainingTest, DrainAgentUnreachable)
+{
+ Clock::pause();
+
+ master::Flags masterFlags = CreateMasterFlags();
+ Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+ ASSERT_SOME(master);
+
+ Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+ FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+ slave::Flags agentFlags = CreateSlaveFlags();
+ Owned<MasterDetector> detector = master.get()->createDetector();
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), agentFlags);
+ ASSERT_SOME(slave);
+
+ Clock::advance(agentFlags.registration_backoff_factor);
+
+ AWAIT_READY(slaveRegisteredMessage);
+
+ auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+ v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.set_checkpoint(true);
+ frameworkInfo.add_capabilities()->set_type(
+ v1::FrameworkInfo::Capability::PARTITION_AWARE);
+
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
+
+ Future<v1::scheduler::Event::Subscribed> subscribed;
+ EXPECT_CALL(*scheduler, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ EXPECT_CALL(*scheduler, heartbeat(_))
+ .WillRepeatedly(Return()); // Ignore heartbeats.
+
+ Future<v1::scheduler::Event::Offers> offers;
+ EXPECT_CALL(*scheduler, offers(_, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return());
+
+ auto mesos = std::make_shared<v1::scheduler::TestMesos>(
+ master.get()->pid, ContentType::PROTOBUF, scheduler);
+
+ AWAIT_READY(subscribed);
+ v1::FrameworkID frameworkId(subscribed->framework_id());
+
+ AWAIT_READY(offers);
+ ASSERT_FALSE(offers->offers().empty());
+
+ const v1::Offer& offer = offers->offers(0);
+ const v1::AgentID& agentId = offer.agent_id();
+
+ Try<v1::Resources> resources =
+ v1::Resources::parse("cpus:0.1;mem:64;disk:64");
+
+ ASSERT_SOME(resources);
+
+ v1::TaskInfo taskInfo =
+ v1::createTask(agentId, resources.get(), SLEEP_COMMAND(1000));
+
+ testing::Sequence updateSequence;
+ Future<v1::scheduler::Event::Update> startingUpdate;
+ Future<v1::scheduler::Event::Update> runningUpdate;
+ Future<v1::scheduler::Event::Update> unreachableUpdate;
+ Future<v1::scheduler::Event::Update> runningUpdate2;
+ Future<v1::scheduler::Event::Update> killedUpdate;
+
+ // Make absolutely sure the agent receives these two acknowledgements
+ // before forcing the agent offline.
+ Future<StatusUpdateAcknowledgementMessage> startingAck =
+ FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _);
+ Future<StatusUpdateAcknowledgementMessage> runningAck =
+ FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _);
+
+ EXPECT_CALL(
+ *scheduler,
+ update(_, AllOf(
+ TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+ TaskStatusUpdateStateEq(v1::TASK_STARTING))))
+ .InSequence(updateSequence)
+ .WillOnce(DoAll(
+ FutureArg<1>(&startingUpdate),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+ EXPECT_CALL(
+ *scheduler,
+ update(_, AllOf(
+ TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+ TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
+ .InSequence(updateSequence)
+ .WillOnce(DoAll(
+ FutureArg<1>(&runningUpdate),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+ EXPECT_CALL(
+ *scheduler,
+ update(_, AllOf(
+ TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+ TaskStatusUpdateStateEq(v1::TASK_UNREACHABLE))))
+ .InSequence(updateSequence)
+ .WillOnce(DoAll(
+ FutureArg<1>(&unreachableUpdate),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+ // When the agent is brought back, we expect a TASK_RUNNING followed by
+ // a TASK_KILLED (due to draining).
+ EXPECT_CALL(
+ *scheduler,
+ update(_, AllOf(
+ TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+ TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
+ .InSequence(updateSequence)
+ .WillOnce(DoAll(
+ FutureArg<1>(&runningUpdate2),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+ EXPECT_CALL(
+ *scheduler,
+ update(_, AllOf(
+ TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+ TaskStatusUpdateStateEq(v1::TASK_KILLED))))
+ .InSequence(updateSequence)
+ .WillOnce(DoAll(
+ FutureArg<1>(&killedUpdate),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+ mesos->send(
+ v1::createCallAccept(
+ frameworkId,
+ offer,
+ {v1::LAUNCH({taskInfo})}));
+
+ AWAIT_READY(startingUpdate);
+ AWAIT_READY(startingAck);
+ AWAIT_READY(runningUpdate);
+ AWAIT_READY(runningAck);
+
+ // Simulate an agent crash, so that it disconnects from the master.
+ slave.get()->terminate();
+ slave->reset();
+
+ Clock::advance(masterFlags.agent_reregister_timeout);
+ AWAIT_READY(unreachableUpdate);
+
+ // Start draining the unreachable agent.
+ ContentType contentType = GetParam();
+
+ {
+ v1::master::Call::DrainAgent drainAgent;
+ drainAgent.mutable_agent_id()->CopyFrom(agentId);
+
+ v1::master::Call call;
+ call.set_type(v1::master::Call::DRAIN_AGENT);
+ call.mutable_drain_agent()->CopyFrom(drainAgent);
+
+ post(master.get()->pid, call, contentType);
+ }
+
+ // Bring the agent back.
+ Future<ReregisterExecutorMessage> reregisterExecutor =
+ FUTURE_PROTOBUF(ReregisterExecutorMessage(), _, _);
+
+ Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+ FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+ Future<DrainSlaveMessage> drainSlaveMesage =
+ FUTURE_PROTOBUF(DrainSlaveMessage(), _, _);
+
+ Try<Owned<cluster::Slave>> recoveredSlave =
+ StartSlave(detector.get(), agentFlags);
+ ASSERT_SOME(recoveredSlave);
+
+ AWAIT_READY(reregisterExecutor);
+ Clock::advance(agentFlags.executor_reregistration_timeout);
+ Clock::settle();
+ Clock::advance(agentFlags.registration_backoff_factor);
+ Clock::settle();
+ AWAIT_READY(slaveReregisteredMessage);
+
+ // The agent should be told to drain once it reregisters.
+ AWAIT_READY(drainSlaveMesage);
+ AWAIT_READY(runningUpdate2);
+ AWAIT_READY(killedUpdate);
+}
+
+} // namespace tests {
+} // namespace internal {
+} // namespace mesos {