You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2014/10/17 19:43:40 UTC
git commit: Fixed MESOS-947: Slave should properly handle a
killTask() that arrives between runTask() and _runTask().
Repository: mesos
Updated Branches:
refs/heads/master 30d12fa1e -> 4356e4f73
Fixed MESOS-947: Slave should properly handle a killTask() that arrives
between runTask() and _runTask().
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4356e4f7
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4356e4f7
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4356e4f7
Branch: refs/heads/master
Commit: 4356e4f73c9c683561240e13217c2e7b19ccb036
Parents: 30d12fa
Author: Bernd Mathiske <be...@mesosphere.io>
Authored: Fri Oct 17 10:41:47 2014 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Fri Oct 17 10:43:00 2014 -0700
----------------------------------------------------------------------
src/slave/slave.cpp | 57 +++++++++++++++-----
src/slave/slave.hpp | 12 +++--
src/tests/mesos.cpp | 62 ++++++++++++++++++++++
src/tests/mesos.hpp | 63 ++++++++++++++++++++++
src/tests/slave_tests.cpp | 116 +++++++++++++++++++++++++++++++++++++++++
5 files changed, 294 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/4356e4f7/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 0e342ed..7b5474a 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1095,7 +1095,7 @@ void Slave::runTask(
LOG(WARNING) << "Ignoring task " << task.task_id()
<< " because the slave is " << state;
// TODO(vinod): Consider sending a TASK_LOST here.
- // Currently it is tricky because 'statsuUpdate()'
+ // Currently it is tricky because 'statusUpdate()'
// ignores updates for unknown frameworks.
return;
}
@@ -1189,16 +1189,28 @@ void Slave::_runTask(
LOG(INFO) << "Launching task " << task.task_id()
<< " for framework " << frameworkId;
+ Framework* framework = getFramework(frameworkId);
+ if (framework == NULL) {
+ LOG(WARNING) << "Ignoring run task " << task.task_id()
+ << " because the framework " << frameworkId
+ << " does not exist";
+ return;
+ }
+
const ExecutorInfo& executorInfo = getExecutorInfo(frameworkId, task);
const ExecutorID& executorId = executorInfo.executor_id();
- // Remove the pending task from framework.
- Framework* framework = getFramework(frameworkId);
- CHECK_NOTNULL(framework);
-
- framework->pending[executorId].erase(task.task_id());
- if (framework->pending[executorId].empty()) {
- framework->pending.erase(executorId);
+ if (framework->pending.contains(executorId) &&
+ framework->pending[executorId].contains(task.task_id())) {
+ framework->pending[executorId].erase(task.task_id());
+ if (framework->pending[executorId].empty()) {
+ framework->pending.erase(executorId);
+ }
+ } else {
+ LOG(WARNING) << "Ignoring run task " << task.task_id()
+ << " of framework " << frameworkId
+ << " because the task has been killed in the meantime";
+ return;
}
// We don't send a status update here because a terminating
@@ -1395,14 +1407,35 @@ void Slave::killTask(
return;
}
+ foreachkey (const ExecutorID& executorId, framework->pending) {
+ if (framework->pending[executorId].contains(taskId)) {
+ LOG(WARNING) << "Killing task " << taskId
+ << " of framework " << frameworkId
+ << " before it was launched";
+
+ const StatusUpdate& update = protobuf::createStatusUpdate(
+ frameworkId, info.id(), taskId, TASK_KILLED,
+ "Task killed before it was launched");
+ statusUpdate(update, UPID());
+
+ framework->pending[executorId].erase(taskId);
+ if (framework->pending[executorId].empty()) {
+ framework->pending.erase(executorId);
+ if (framework->pending.empty() && framework->executors.empty()) {
+ removeFramework(framework);
+ }
+ }
+ return;
+ }
+ }
+
Executor* executor = framework->getExecutor(taskId);
if (executor == NULL) {
- LOG(WARNING) << "Cannot kill task " << taskId
+ LOG(WARNING) << "Cannot kill task " << taskId
<< " of framework " << frameworkId
<< " because no corresponding executor is running";
-
- // We send a TASK_LOST update because this task might have never
- // been launched on this slave!
+ // We send a TASK_LOST update because this task has never
+ // been launched on this slave.
const StatusUpdate& update = protobuf::createStatusUpdate(
frameworkId, info.id(), taskId, TASK_LOST, "Cannot find executor");
http://git-wip-us.apache.org/repos/asf/mesos/blob/4356e4f7/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 342b09f..ccc0e03 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -106,14 +106,16 @@ public:
void doReliableRegistration(const Duration& duration);
- void runTask(
+ // Made 'virtual' for Slave mocking.
+ virtual void runTask(
const process::UPID& from,
const FrameworkInfo& frameworkInfo,
const FrameworkID& frameworkId,
const std::string& pid,
const TaskInfo& task);
- void _runTask(
+ // Made 'virtual' for Slave mocking.
+ virtual void _runTask(
const process::Future<bool>& future,
const FrameworkInfo& frameworkInfo,
const FrameworkID& frameworkId,
@@ -122,7 +124,8 @@ public:
process::Future<bool> unschedule(const std::string& path);
- void killTask(
+ // Made 'virtual' for Slave mocking.
+ virtual void killTask(
const process::UPID& from,
const FrameworkID& frameworkId,
const TaskID& taskId);
@@ -320,7 +323,8 @@ public:
void removeExecutor(Framework* framework, Executor* executor);
// Removes and garbage collects the framework.
- void removeFramework(Framework* framework);
+ // Made 'virtual' for Slave mocking.
+ virtual void removeFramework(Framework* framework);
// Schedules a 'path' for gc based on its modification time.
Future<Nothing> garbageCollect(const std::string& path);
http://git-wip-us.apache.org/repos/asf/mesos/blob/4356e4f7/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index 3dcb2ac..147e23f 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -48,6 +48,8 @@
#include "tests/mesos.hpp"
using std::string;
+using testing::_;
+using testing::Invoke;
using namespace process;
@@ -338,6 +340,66 @@ void MesosTest::ShutdownSlaves()
}
+MockSlave::MockSlave(const slave::Flags& flags,
+ MasterDetector* detector,
+ slave::Containerizer* containerizer)
+ : slave::Slave(
+ flags,
+ detector,
+ containerizer,
+ &files,
+ &gc,
+ &statusUpdateManager)
+{
+ // Set up default behaviors, calling the original methods.
+ EXPECT_CALL(*this, runTask(_, _, _, _, _)).
+ WillRepeatedly(Invoke(this, &MockSlave::unmocked_runTask));
+ EXPECT_CALL(*this, _runTask(_, _, _, _, _)).
+ WillRepeatedly(Invoke(this, &MockSlave::unmocked__runTask));
+ EXPECT_CALL(*this, killTask(_, _, _)).
+ WillRepeatedly(Invoke(this, &MockSlave::unmocked_killTask));
+ EXPECT_CALL(*this, removeFramework(_)).
+ WillRepeatedly(Invoke(this, &MockSlave::unmocked_removeFramework));
+}
+
+
+void MockSlave::unmocked_runTask(
+ const process::UPID& from,
+ const FrameworkInfo& frameworkInfo,
+ const FrameworkID& frameworkId,
+ const std::string& pid,
+ const TaskInfo& task)
+{
+ slave::Slave::runTask(from, frameworkInfo, frameworkId, pid, task);
+}
+
+
+void MockSlave::unmocked__runTask(
+ const process::Future<bool>& future,
+ const FrameworkInfo& frameworkInfo,
+ const FrameworkID& frameworkId,
+ const std::string& pid,
+ const TaskInfo& task)
+{
+ slave::Slave::_runTask(future, frameworkInfo, frameworkId, pid, task);
+}
+
+
+void MockSlave::unmocked_killTask(
+ const process::UPID& from,
+ const FrameworkID& frameworkId,
+ const TaskID& taskId)
+{
+ slave::Slave::killTask(from, frameworkId, taskId);
+}
+
+
+void MockSlave::unmocked_removeFramework(slave::Framework* framework)
+{
+ slave::Slave::removeFramework(framework);
+}
+
+
slave::Flags ContainerizerTest<slave::MesosContainerizer>::CreateSlaveFlags()
{
slave::Flags flags = MesosTest::CreateSlaveFlags();
http://git-wip-us.apache.org/repos/asf/mesos/blob/4356e4f7/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 957e223..e40575c 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -538,6 +538,69 @@ public:
};
+// Definition of a mock Slave to be used in tests with gmock, covering
+// potential races between runTask and killTask.
+class MockSlave : public slave::Slave
+{
+public:
+ MockSlave(
+ const slave::Flags& flags,
+ MasterDetector* detector,
+ slave::Containerizer* containerizer);
+
+ virtual ~MockSlave() {}
+
+ MOCK_METHOD5(runTask, void(
+ const process::UPID& from,
+ const FrameworkInfo& frameworkInfo,
+ const FrameworkID& frameworkId,
+ const std::string& pid,
+ const TaskInfo& task));
+
+ void unmocked_runTask(
+ const process::UPID& from,
+ const FrameworkInfo& frameworkInfo,
+ const FrameworkID& frameworkId,
+ const std::string& pid,
+ const TaskInfo& task);
+
+ MOCK_METHOD5(_runTask, void(
+ const process::Future<bool>& future,
+ const FrameworkInfo& frameworkInfo,
+ const FrameworkID& frameworkId,
+ const std::string& pid,
+ const TaskInfo& task));
+
+ void unmocked__runTask(
+ const process::Future<bool>& future,
+ const FrameworkInfo& frameworkInfo,
+ const FrameworkID& frameworkId,
+ const std::string& pid,
+ const TaskInfo& task);
+
+ MOCK_METHOD3(killTask, void(
+ const process::UPID& from,
+ const FrameworkID& frameworkId,
+ const TaskID& taskId));
+
+ void unmocked_killTask(
+ const process::UPID& from,
+ const FrameworkID& frameworkId,
+ const TaskID& taskId);
+
+ MOCK_METHOD1(removeFramework, void(
+ slave::Framework* framework));
+
+ void unmocked_removeFramework(
+ slave::Framework* framework);
+
+private:
+ Files files;
+ MockGarbageCollector gc;
+ slave::StatusUpdateManager statusUpdateManager;
+};
+
+
// Definition of a MockAuthozier that can be used in tests with gmock.
class MockAuthorizer : public Authorizer
{
http://git-wip-us.apache.org/repos/asf/mesos/blob/4356e4f7/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index f585bdd..a1bd1d4 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -81,7 +81,10 @@ using testing::_;
using testing::AtMost;
using testing::DoAll;
using testing::Eq;
+using testing::Invoke;
+using testing::InvokeWithoutArgs;
using testing::Return;
+using testing::SaveArg;
// Those of the overall Mesos master/slave/scheduler/driver tests
// that seem vaguely more slave than master-related are in this file.
@@ -1035,3 +1038,116 @@ TEST_F(SlaveTest, PingTimeoutSomePings)
AWAIT_READY(detected);
AWAIT_READY(slaveReregisteredMessage);
}
+
+// This test ensures that a killTask() can happen between runTask()
+// and _runTask() and then gets "handled properly". This means that
+// the task never gets started, but also does not get lost. The end
+// result is status TASK_KILLED. Essentially, killing the task is
+// realized while preparing to start it. See MESOS-947.
+TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+ TestContainerizer containerizer(&exec);
+
+ StandaloneMasterDetector detector(master.get());
+
+ MockSlave slave(CreateSlaveFlags(), &detector, &containerizer);
+ process::spawn(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .Times(1);
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ TaskInfo task;
+ task.set_name("");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
+ task.mutable_resources()->MergeFrom(offers.get()[0].resources());
+ task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+
+ vector<TaskInfo> tasks;
+ tasks.push_back(task);
+
+ EXPECT_CALL(exec, registered(_, _, _, _))
+ .Times(0);
+
+ EXPECT_CALL(exec, launchTask(_, _))
+ .Times(0);
+
+ EXPECT_CALL(exec, shutdown(_))
+ .Times(0);
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillRepeatedly(FutureArg<1>(&status));
+
+ EXPECT_CALL(slave, runTask(_, _, _, _, _))
+ .WillOnce(Invoke(&slave, &MockSlave::unmocked_runTask));
+
+ // Saved arguments from Slave::_runTask().
+ Future<bool> future;
+ FrameworkInfo frameworkInfo;
+ FrameworkID frameworkId;
+
+ // Skip what Slave::_runTask() normally does, save its arguments for
+ // later, tie reaching the critical moment when to kill the task to
+ // a future.
+ Future<Nothing> _runTask;
+ EXPECT_CALL(slave, _runTask(_, _, _, _, _))
+ .WillOnce(DoAll(FutureSatisfy(&_runTask),
+ SaveArg<0>(&future),
+ SaveArg<1>(&frameworkInfo),
+ SaveArg<2>(&frameworkId)));
+
+ driver.launchTasks(offers.get()[0].id(), tasks);
+
+ AWAIT_READY(_runTask);
+
+ Future<Nothing> killTask;
+ EXPECT_CALL(slave, killTask(_, _, _))
+ .WillOnce(DoAll(Invoke(&slave, &MockSlave::unmocked_killTask),
+ FutureSatisfy(&killTask)));
+ driver.killTask(task.task_id());
+
+ // Since this is the only task ever for this framework, the
+ // framework should get removed in Slave::_runTask().
+ // Thus we can observe that this happens before Shutdown().
+ Future<Nothing> removeFramework;
+ EXPECT_CALL(slave, removeFramework(_))
+ .WillOnce(DoAll(Invoke(&slave, &MockSlave::unmocked_removeFramework),
+ FutureSatisfy(&removeFramework)));
+
+ AWAIT_READY(killTask);
+ slave.unmocked__runTask(
+ future, frameworkInfo, frameworkId, master.get(), task);
+
+ AWAIT_READY(removeFramework);
+
+ AWAIT_READY(status);
+ EXPECT_EQ(TASK_KILLED, status.get().state());
+
+ driver.stop();
+ driver.join();
+
+ process::terminate(slave);
+ process::wait(slave);
+
+ Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}