You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2018/04/06 06:33:07 UTC

[13/13] mesos git commit: Added a test for killing the executor during task launch.

Added a test for killing the executor during task launch.

This test verifies that when the agent shuts down a running
executor, launching tasks on the agent that use the same
executor will be dropped.

Review: https://reviews.apache.org/r/66347/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/06d8d759
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/06d8d759
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/06d8d759

Branch: refs/heads/master
Commit: 06d8d759a4d7d85e291644711cecd835825d54dd
Parents: 731136a
Author: Meng Zhu <mz...@mesosphere.io>
Authored: Thu Apr 5 17:44:53 2018 -0700
Committer: Greg Mann <gr...@gmail.com>
Committed: Thu Apr 5 17:59:05 2018 -0700

----------------------------------------------------------------------
 src/tests/slave_tests.cpp | 170 +++++++++++++++++++++++++++++++++++++++++
 1 file changed, 170 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/06d8d759/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 2f90d1d..646a2b9 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -5641,6 +5641,176 @@ TEST_F(SlaveTest, LaunchTaskGroupsUsingSameExecutorKillLaterTaskGroup)
 }
 
 
+// This test verifies that when agent shuts down a running executor, launching
+// tasks on the agent that use the same executor will be dropped properly.
+TEST_F(SlaveTest, ShutdownExecutorWhileTaskLaunching)
+{
+  Clock::pause();
+
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+
+  // Start a mock slave.
+  Try<Owned<cluster::Slave>> slave =
+    StartSlave(detector.get(), slaveFlags, true);
+
+  ASSERT_SOME(slave);
+  ASSERT_NE(nullptr, slave.get()->mock());
+  slave.get()->start();
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  EXPECT_CALL(*scheduler, failure(_, _))
+    .Times(AtMost(1));
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid, ContentType::PROTOBUF, scheduler);
+
+  // Advance the clock to trigger both agent registration and a batch
+  // allocation.
+  Clock::advance(slaveFlags.registration_backoff_factor);
+  Clock::advance(masterFlags.allocation_interval);
+
+  AWAIT_READY(subscribed);
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+
+  const v1::Offer& offer = offers->offers(0);
+  const v1::AgentID& agentId = offer.agent_id();
+
+  v1::Resources resources =
+    v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
+
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
+      "default1", None(), resources, v1::ExecutorInfo::DEFAULT, frameworkId);
+
+  // Create two separate task groups that use the same executor.
+  v1::TaskInfo taskInfo1 = v1::createTask(agentId, resources, "sleep 1000");
+  v1::TaskGroupInfo taskGroup1 = v1::createTaskGroupInfo({taskInfo1});
+
+  v1::TaskInfo taskInfo2 = v1::createTask(agentId, resources, "sleep 1000");
+  v1::TaskGroupInfo taskGroup2 = v1::createTaskGroupInfo({taskInfo2});
+
+  v1::Offer::Operation launchGroup1 =
+    v1::LAUNCH_GROUP(executorInfo, taskGroup1);
+  v1::Offer::Operation launchGroup2 =
+    v1::LAUNCH_GROUP(executorInfo, taskGroup2);
+
+  Future<v1::scheduler::Event::Update> task1Starting, task1Running;
+  Future<v1::scheduler::Event::Update> task2Lost;
+  EXPECT_CALL(*scheduler, update(_, TaskStatusUpdateTaskIdEq(taskInfo1)))
+    .WillOnce(DoAll(
+        FutureArg<1>(&task1Starting),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+    .WillOnce(DoAll(
+        FutureArg<1>(&task1Running),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+  EXPECT_CALL(*scheduler, update(_, TaskStatusUpdateTaskIdEq(taskInfo2)))
+    .WillOnce(DoAll(
+        FutureArg<1>(&task2Lost),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  // Saved arguments from `Slave::_run()` for `taskGroup2`.
+  FrameworkInfo _frameworkInfo;
+  ExecutorInfo _executorInfo;
+  Option<TaskGroupInfo> _taskGroup;
+  Option<TaskInfo> _task;
+  vector<ResourceVersionUUID> _resourceVersionUuids;
+  Option<bool> _launchExecutor;
+
+  // Pause the launch of `taskGroup2` at `_run` by returning a pending future.
+  Promise<Nothing> promiseTask2;
+  Future<Nothing> runTask2;
+  EXPECT_CALL(
+      *slave.get()->mock(),
+      _run(_, _, _,
+           OptionTaskGroupHasTaskID(devolve(taskInfo2.task_id())),
+           _, _))
+    .WillOnce(
+        DoAll(FutureSatisfy(&runTask2),
+        SaveArg<0>(&_frameworkInfo),
+        SaveArg<1>(&_executorInfo),
+        SaveArg<2>(&_task),
+        SaveArg<3>(&_taskGroup),
+        SaveArg<4>(&_resourceVersionUuids),
+        SaveArg<5>(&_launchExecutor),
+        Return(promiseTask2.future())));
+
+  // Launch the two task groups.
+  mesos.send(
+      v1::createCallAccept(frameworkId, offer, {launchGroup1, launchGroup2}));
+
+  AWAIT_READY(runTask2);
+
+  // `taskGroup1` launches successfully.
+  AWAIT_READY(task1Starting);
+  EXPECT_EQ(v1::TASK_STARTING, task1Starting->status().state());
+
+  AWAIT_READY(task1Running);
+  EXPECT_EQ(v1::TASK_RUNNING, task1Running->status().state());
+
+  // Shutdown the executor while `taskGroup2` is still launching.
+  Future<Nothing> shutdownExecutor;
+  EXPECT_CALL(*slave.get()->mock(), shutdownExecutor(_, _, _))
+    .WillOnce(DoAll(
+        Invoke(slave.get()->mock(), &MockSlave::unmocked_shutdownExecutor),
+        FutureSatisfy(&shutdownExecutor)));
+
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.set_type(Call::SHUTDOWN);
+
+    Call::Shutdown* shutdown = call.mutable_shutdown();
+    shutdown->mutable_executor_id()->CopyFrom(executorInfo.executor_id());
+    shutdown->mutable_agent_id()->CopyFrom(agentId);
+
+    mesos.send(call);
+  }
+
+  AWAIT_READY(shutdownExecutor);
+
+  // Resume launching `taskGroup2`.
+  Future<Nothing> unmocked__run = process::dispatch(slave.get()->pid, [=] {
+    return slave.get()->mock()->unmocked__run(
+        _frameworkInfo,
+        _executorInfo,
+        _task,
+        _taskGroup,
+        _resourceVersionUuids,
+        _launchExecutor);
+  });
+
+  promiseTask2.associate(unmocked__run);
+
+  // `taskGroup2` is dropped because the executor is terminated.
+  AWAIT_READY(task2Lost);
+  EXPECT_EQ(v1::TASK_LOST, task2Lost->status().state());
+}
+
+
 // This test ensures that agent sends ExitedExecutorMessage when the task
 // fails to launch due to task authorization failure and that master's executor
 // bookkeeping entry is removed.