You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2018/04/06 06:33:07 UTC
[13/13] mesos git commit: Added a test for killing the executor
during task launch.
Added a test for killing the executor during task launch.
This test verifies that when the agent shuts down a running
executor, launching tasks on the agent that use the same
executor will be dropped.
Review: https://reviews.apache.org/r/66347/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/06d8d759
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/06d8d759
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/06d8d759
Branch: refs/heads/master
Commit: 06d8d759a4d7d85e291644711cecd835825d54dd
Parents: 731136a
Author: Meng Zhu <mz...@mesosphere.io>
Authored: Thu Apr 5 17:44:53 2018 -0700
Committer: Greg Mann <gr...@gmail.com>
Committed: Thu Apr 5 17:59:05 2018 -0700
----------------------------------------------------------------------
src/tests/slave_tests.cpp | 170 +++++++++++++++++++++++++++++++++++++++++
1 file changed, 170 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/06d8d759/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 2f90d1d..646a2b9 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -5641,6 +5641,176 @@ TEST_F(SlaveTest, LaunchTaskGroupsUsingSameExecutorKillLaterTaskGroup)
}
+// This test verifies that when agent shuts down a running executor, launching
+// tasks on the agent that use the same executor will be dropped properly.
+TEST_F(SlaveTest, ShutdownExecutorWhileTaskLaunching)
+{
+ Clock::pause();
+
+ master::Flags masterFlags = CreateMasterFlags();
+ Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+ ASSERT_SOME(master);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ slave::Flags slaveFlags = CreateSlaveFlags();
+
+ // Start a mock slave.
+ Try<Owned<cluster::Slave>> slave =
+ StartSlave(detector.get(), slaveFlags, true);
+
+ ASSERT_SOME(slave);
+ ASSERT_NE(nullptr, slave.get()->mock());
+ slave.get()->start();
+
+ auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
+
+ Future<v1::scheduler::Event::Subscribed> subscribed;
+ EXPECT_CALL(*scheduler, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ EXPECT_CALL(*scheduler, heartbeat(_))
+ .WillRepeatedly(Return()); // Ignore heartbeats.
+
+ Future<v1::scheduler::Event::Offers> offers;
+ EXPECT_CALL(*scheduler, offers(_, _))
+ .WillOnce(FutureArg<1>(&offers));
+
+ EXPECT_CALL(*scheduler, failure(_, _))
+ .Times(AtMost(1));
+
+ v1::scheduler::TestMesos mesos(
+ master.get()->pid, ContentType::PROTOBUF, scheduler);
+
+ // Advance the clock to trigger both agent registration and a batch
+ // allocation.
+ Clock::advance(slaveFlags.registration_backoff_factor);
+ Clock::advance(masterFlags.allocation_interval);
+
+ AWAIT_READY(subscribed);
+
+ AWAIT_READY(offers);
+ ASSERT_FALSE(offers->offers().empty());
+
+ const v1::Offer& offer = offers->offers(0);
+ const v1::AgentID& agentId = offer.agent_id();
+
+ v1::Resources resources =
+ v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
+
+ v1::FrameworkID frameworkId(subscribed->framework_id());
+
+ v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
+ "default1", None(), resources, v1::ExecutorInfo::DEFAULT, frameworkId);
+
+ // Create two separate task groups that use the same executor.
+ v1::TaskInfo taskInfo1 = v1::createTask(agentId, resources, "sleep 1000");
+ v1::TaskGroupInfo taskGroup1 = v1::createTaskGroupInfo({taskInfo1});
+
+ v1::TaskInfo taskInfo2 = v1::createTask(agentId, resources, "sleep 1000");
+ v1::TaskGroupInfo taskGroup2 = v1::createTaskGroupInfo({taskInfo2});
+
+ v1::Offer::Operation launchGroup1 =
+ v1::LAUNCH_GROUP(executorInfo, taskGroup1);
+ v1::Offer::Operation launchGroup2 =
+ v1::LAUNCH_GROUP(executorInfo, taskGroup2);
+
+ Future<v1::scheduler::Event::Update> task1Starting, task1Running;
+ Future<v1::scheduler::Event::Update> task2Lost;
+ EXPECT_CALL(*scheduler, update(_, TaskStatusUpdateTaskIdEq(taskInfo1)))
+ .WillOnce(DoAll(
+ FutureArg<1>(&task1Starting),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+ .WillOnce(DoAll(
+ FutureArg<1>(&task1Running),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+ EXPECT_CALL(*scheduler, update(_, TaskStatusUpdateTaskIdEq(taskInfo2)))
+ .WillOnce(DoAll(
+ FutureArg<1>(&task2Lost),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+ // Saved arguments from `Slave::_run()` for `taskGroup2`.
+ FrameworkInfo _frameworkInfo;
+ ExecutorInfo _executorInfo;
+ Option<TaskGroupInfo> _taskGroup;
+ Option<TaskInfo> _task;
+ vector<ResourceVersionUUID> _resourceVersionUuids;
+ Option<bool> _launchExecutor;
+
+ // Pause the launch of `taskGroup2` at `_run` by returning a pending future.
+ Promise<Nothing> promiseTask2;
+ Future<Nothing> runTask2;
+ EXPECT_CALL(
+ *slave.get()->mock(),
+ _run(_, _, _,
+ OptionTaskGroupHasTaskID(devolve(taskInfo2.task_id())),
+ _, _))
+ .WillOnce(
+ DoAll(FutureSatisfy(&runTask2),
+ SaveArg<0>(&_frameworkInfo),
+ SaveArg<1>(&_executorInfo),
+ SaveArg<2>(&_task),
+ SaveArg<3>(&_taskGroup),
+ SaveArg<4>(&_resourceVersionUuids),
+ SaveArg<5>(&_launchExecutor),
+ Return(promiseTask2.future())));
+
+ // Launch the two task groups.
+ mesos.send(
+ v1::createCallAccept(frameworkId, offer, {launchGroup1, launchGroup2}));
+
+ AWAIT_READY(runTask2);
+
+ // `taskGroup1` launches successfully.
+ AWAIT_READY(task1Starting);
+ EXPECT_EQ(v1::TASK_STARTING, task1Starting->status().state());
+
+ AWAIT_READY(task1Running);
+ EXPECT_EQ(v1::TASK_RUNNING, task1Running->status().state());
+
+ // Shutdown the executor while `taskGroup2` is still launching.
+ Future<Nothing> shutdownExecutor;
+ EXPECT_CALL(*slave.get()->mock(), shutdownExecutor(_, _, _))
+ .WillOnce(DoAll(
+ Invoke(slave.get()->mock(), &MockSlave::unmocked_shutdownExecutor),
+ FutureSatisfy(&shutdownExecutor)));
+
+ {
+ Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.set_type(Call::SHUTDOWN);
+
+ Call::Shutdown* shutdown = call.mutable_shutdown();
+ shutdown->mutable_executor_id()->CopyFrom(executorInfo.executor_id());
+ shutdown->mutable_agent_id()->CopyFrom(agentId);
+
+ mesos.send(call);
+ }
+
+ AWAIT_READY(shutdownExecutor);
+
+ // Resume launching `taskGroup2`.
+ Future<Nothing> unmocked__run = process::dispatch(slave.get()->pid, [=] {
+ return slave.get()->mock()->unmocked__run(
+ _frameworkInfo,
+ _executorInfo,
+ _task,
+ _taskGroup,
+ _resourceVersionUuids,
+ _launchExecutor);
+ });
+
+ promiseTask2.associate(unmocked__run);
+
+ // `taskGroup2` is dropped because the executor is terminated.
+ AWAIT_READY(task2Lost);
+ EXPECT_EQ(v1::TASK_LOST, task2Lost->status().state());
+}
+
+
// This test ensures that agent sends ExitedExecutorMessage when the task
// fails to launch due to task authorization failure and that master's executor
// bookkeeping entry is removed.