You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2014/10/17 19:43:40 UTC

git commit: Fixed MESOS-947: Slave should properly handle a killTask() that arrives between runTask() and _runTask().

Repository: mesos
Updated Branches:
  refs/heads/master 30d12fa1e -> 4356e4f73


Fixed MESOS-947: Slave should properly handle a killTask() that arrives
between runTask() and _runTask().


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4356e4f7
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4356e4f7
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4356e4f7

Branch: refs/heads/master
Commit: 4356e4f73c9c683561240e13217c2e7b19ccb036
Parents: 30d12fa
Author: Bernd Mathiske <be...@mesosphere.io>
Authored: Fri Oct 17 10:41:47 2014 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Fri Oct 17 10:43:00 2014 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp       |  57 +++++++++++++++-----
 src/slave/slave.hpp       |  12 +++--
 src/tests/mesos.cpp       |  62 ++++++++++++++++++++++
 src/tests/mesos.hpp       |  63 ++++++++++++++++++++++
 src/tests/slave_tests.cpp | 116 +++++++++++++++++++++++++++++++++++++++++
 5 files changed, 294 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4356e4f7/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 0e342ed..7b5474a 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1095,7 +1095,7 @@ void Slave::runTask(
     LOG(WARNING) << "Ignoring task " << task.task_id()
                  << " because the slave is " << state;
     // TODO(vinod): Consider sending a TASK_LOST here.
-    // Currently it is tricky because 'statsuUpdate()'
+    // Currently it is tricky because 'statusUpdate()'
     // ignores updates for unknown frameworks.
     return;
   }
@@ -1189,16 +1189,28 @@ void Slave::_runTask(
   LOG(INFO) << "Launching task " << task.task_id()
             << " for framework " << frameworkId;
 
+  Framework* framework = getFramework(frameworkId);
+  if (framework == NULL) {
+     LOG(WARNING) << "Ignoring run task " << task.task_id()
+                  << " because the framework " << frameworkId
+                  << " does not exist";
+     return;
+  }
+
   const ExecutorInfo& executorInfo = getExecutorInfo(frameworkId, task);
   const ExecutorID& executorId = executorInfo.executor_id();
 
-  // Remove the pending task from framework.
-  Framework* framework = getFramework(frameworkId);
-  CHECK_NOTNULL(framework);
-
-  framework->pending[executorId].erase(task.task_id());
-  if (framework->pending[executorId].empty()) {
-    framework->pending.erase(executorId);
+  if (framework->pending.contains(executorId) &&
+      framework->pending[executorId].contains(task.task_id())) {
+    framework->pending[executorId].erase(task.task_id());
+    if (framework->pending[executorId].empty()) {
+        framework->pending.erase(executorId);
+    }
+  } else {
+    LOG(WARNING) << "Ignoring run task " << task.task_id()
+                 << " of framework " << frameworkId
+                 << " because the task has been killed in the meantime";
+    return;
   }
 
   // We don't send a status update here because a terminating
@@ -1395,14 +1407,35 @@ void Slave::killTask(
     return;
   }
 
+  foreachkey (const ExecutorID& executorId, framework->pending) {
+    if (framework->pending[executorId].contains(taskId)) {
+      LOG(WARNING) << "Killing task " << taskId
+                   << " of framework " << frameworkId
+                   << " before it was launched";
+
+      const StatusUpdate& update = protobuf::createStatusUpdate(
+          frameworkId, info.id(), taskId, TASK_KILLED,
+          "Task killed before it was launched");
+      statusUpdate(update, UPID());
+
+      framework->pending[executorId].erase(taskId);
+      if (framework->pending[executorId].empty()) {
+          framework->pending.erase(executorId);
+          if (framework->pending.empty() && framework->executors.empty()) {
+            removeFramework(framework);
+          }
+      }
+      return;
+    }
+  }
+
   Executor* executor = framework->getExecutor(taskId);
   if (executor == NULL) {
-    LOG(WARNING) << "Cannot kill task " << taskId
+      LOG(WARNING) << "Cannot kill task " << taskId
                  << " of framework " << frameworkId
                  << " because no corresponding executor is running";
-
-    // We send a TASK_LOST update because this task might have never
-    // been launched on this slave!
+    // We send a TASK_LOST update because this task has never
+    // been launched on this slave.
     const StatusUpdate& update = protobuf::createStatusUpdate(
         frameworkId, info.id(), taskId, TASK_LOST, "Cannot find executor");
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/4356e4f7/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 342b09f..ccc0e03 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -106,14 +106,16 @@ public:
 
   void doReliableRegistration(const Duration& duration);
 
-  void runTask(
+  // Made 'virtual' for Slave mocking.
+  virtual void runTask(
       const process::UPID& from,
       const FrameworkInfo& frameworkInfo,
       const FrameworkID& frameworkId,
       const std::string& pid,
       const TaskInfo& task);
 
-  void _runTask(
+  // Made 'virtual' for Slave mocking.
+  virtual void _runTask(
       const process::Future<bool>& future,
       const FrameworkInfo& frameworkInfo,
       const FrameworkID& frameworkId,
@@ -122,7 +124,8 @@ public:
 
   process::Future<bool> unschedule(const std::string& path);
 
-  void killTask(
+  // Made 'virtual' for Slave mocking.
+  virtual void killTask(
       const process::UPID& from,
       const FrameworkID& frameworkId,
       const TaskID& taskId);
@@ -320,7 +323,8 @@ public:
   void removeExecutor(Framework* framework, Executor* executor);
 
   // Removes and garbage collects the framework.
-  void removeFramework(Framework* framework);
+  // Made 'virtual' for Slave mocking.
+  virtual void removeFramework(Framework* framework);
 
   // Schedules a 'path' for gc based on its modification time.
   Future<Nothing> garbageCollect(const std::string& path);

http://git-wip-us.apache.org/repos/asf/mesos/blob/4356e4f7/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index 3dcb2ac..147e23f 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -48,6 +48,8 @@
 #include "tests/mesos.hpp"
 
 using std::string;
+using testing::_;
+using testing::Invoke;
 
 using namespace process;
 
@@ -338,6 +340,66 @@ void MesosTest::ShutdownSlaves()
 }
 
 
+MockSlave::MockSlave(const slave::Flags& flags,
+                     MasterDetector* detector,
+                     slave::Containerizer* containerizer)
+  : slave::Slave(
+      flags,
+      detector,
+      containerizer,
+      &files,
+      &gc,
+      &statusUpdateManager)
+{
+  // Set up default behaviors, calling the original methods.
+  EXPECT_CALL(*this, runTask(_, _, _, _, _)).
+      WillRepeatedly(Invoke(this, &MockSlave::unmocked_runTask));
+  EXPECT_CALL(*this, _runTask(_, _, _, _, _)).
+      WillRepeatedly(Invoke(this, &MockSlave::unmocked__runTask));
+  EXPECT_CALL(*this, killTask(_, _, _)).
+      WillRepeatedly(Invoke(this, &MockSlave::unmocked_killTask));
+  EXPECT_CALL(*this, removeFramework(_)).
+      WillRepeatedly(Invoke(this, &MockSlave::unmocked_removeFramework));
+}
+
+
+void MockSlave::unmocked_runTask(
+    const process::UPID& from,
+    const FrameworkInfo& frameworkInfo,
+    const FrameworkID& frameworkId,
+    const std::string& pid,
+    const TaskInfo& task)
+{
+  slave::Slave::runTask(from, frameworkInfo, frameworkId, pid, task);
+}
+
+
+void MockSlave::unmocked__runTask(
+      const process::Future<bool>& future,
+      const FrameworkInfo& frameworkInfo,
+      const FrameworkID& frameworkId,
+      const std::string& pid,
+      const TaskInfo& task)
+{
+  slave::Slave::_runTask(future, frameworkInfo, frameworkId, pid, task);
+}
+
+
+void MockSlave::unmocked_killTask(
+      const process::UPID& from,
+      const FrameworkID& frameworkId,
+      const TaskID& taskId)
+{
+  slave::Slave::killTask(from, frameworkId, taskId);
+}
+
+
+void MockSlave::unmocked_removeFramework(slave::Framework* framework)
+{
+  slave::Slave::removeFramework(framework);
+}
+
+
 slave::Flags ContainerizerTest<slave::MesosContainerizer>::CreateSlaveFlags()
 {
   slave::Flags flags = MesosTest::CreateSlaveFlags();

http://git-wip-us.apache.org/repos/asf/mesos/blob/4356e4f7/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 957e223..e40575c 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -538,6 +538,69 @@ public:
 };
 
 
+// Definition of a mock Slave to be used in tests with gmock, covering
+// potential races between runTask and killTask.
+class MockSlave : public slave::Slave
+{
+public:
+  MockSlave(
+      const slave::Flags& flags,
+      MasterDetector* detector,
+      slave::Containerizer* containerizer);
+
+  virtual ~MockSlave() {}
+
+  MOCK_METHOD5(runTask, void(
+      const process::UPID& from,
+      const FrameworkInfo& frameworkInfo,
+      const FrameworkID& frameworkId,
+      const std::string& pid,
+      const TaskInfo& task));
+
+  void unmocked_runTask(
+      const process::UPID& from,
+      const FrameworkInfo& frameworkInfo,
+      const FrameworkID& frameworkId,
+      const std::string& pid,
+      const TaskInfo& task);
+
+  MOCK_METHOD5(_runTask, void(
+      const process::Future<bool>& future,
+      const FrameworkInfo& frameworkInfo,
+      const FrameworkID& frameworkId,
+      const std::string& pid,
+      const TaskInfo& task));
+
+  void unmocked__runTask(
+      const process::Future<bool>& future,
+      const FrameworkInfo& frameworkInfo,
+      const FrameworkID& frameworkId,
+      const std::string& pid,
+      const TaskInfo& task);
+
+  MOCK_METHOD3(killTask, void(
+      const process::UPID& from,
+      const FrameworkID& frameworkId,
+      const TaskID& taskId));
+
+  void unmocked_killTask(
+      const process::UPID& from,
+      const FrameworkID& frameworkId,
+      const TaskID& taskId);
+
+  MOCK_METHOD1(removeFramework, void(
+      slave::Framework* framework));
+
+  void unmocked_removeFramework(
+      slave::Framework* framework);
+
+private:
+  Files files;
+  MockGarbageCollector gc;
+  slave::StatusUpdateManager statusUpdateManager;
+};
+
+
 // Definition of a MockAuthozier that can be used in tests with gmock.
 class MockAuthorizer : public Authorizer
 {

http://git-wip-us.apache.org/repos/asf/mesos/blob/4356e4f7/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index f585bdd..a1bd1d4 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -81,7 +81,10 @@ using testing::_;
 using testing::AtMost;
 using testing::DoAll;
 using testing::Eq;
+using testing::Invoke;
+using testing::InvokeWithoutArgs;
 using testing::Return;
+using testing::SaveArg;
 
 // Those of the overall Mesos master/slave/scheduler/driver tests
 // that seem vaguely more slave than master-related are in this file.
@@ -1035,3 +1038,116 @@ TEST_F(SlaveTest, PingTimeoutSomePings)
   AWAIT_READY(detected);
   AWAIT_READY(slaveReregisteredMessage);
 }
+
+// This test ensures that a killTask() can happen between runTask()
+// and _runTask() and then gets "handled properly". This means that
+// the task never gets started, but also does not get lost. The end
+// result is status TASK_KILLED. Essentially, killing the task is
+// realized while preparing to start it. See MESOS-947.
+TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  TestContainerizer containerizer(&exec);
+
+  StandaloneMasterDetector detector(master.get());
+
+  MockSlave slave(CreateSlaveFlags(), &detector, &containerizer);
+  process::spawn(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .Times(1);
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
+  task.mutable_resources()->MergeFrom(offers.get()[0].resources());
+  task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .Times(0);
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .Times(0);
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(0);
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillRepeatedly(FutureArg<1>(&status));
+
+  EXPECT_CALL(slave, runTask(_, _, _, _, _))
+    .WillOnce(Invoke(&slave, &MockSlave::unmocked_runTask));
+
+  // Saved arguments from Slave::_runTask().
+  Future<bool> future;
+  FrameworkInfo frameworkInfo;
+  FrameworkID frameworkId;
+
+  // Skip what Slave::_runTask() normally does, save its arguments for
+  // later, tie reaching the critical moment when to kill the task to
+  // a future.
+  Future<Nothing> _runTask;
+  EXPECT_CALL(slave, _runTask(_, _, _, _, _))
+    .WillOnce(DoAll(FutureSatisfy(&_runTask),
+                    SaveArg<0>(&future),
+                    SaveArg<1>(&frameworkInfo),
+                    SaveArg<2>(&frameworkId)));
+
+  driver.launchTasks(offers.get()[0].id(), tasks);
+
+  AWAIT_READY(_runTask);
+
+  Future<Nothing> killTask;
+  EXPECT_CALL(slave, killTask(_, _, _))
+    .WillOnce(DoAll(Invoke(&slave, &MockSlave::unmocked_killTask),
+                    FutureSatisfy(&killTask)));
+  driver.killTask(task.task_id());
+
+  // Since this is the only task ever for this framework, the
+  // framework should get removed in Slave::_runTask().
+  // Thus we can observe that this happens before Shutdown().
+  Future<Nothing> removeFramework;
+  EXPECT_CALL(slave, removeFramework(_))
+    .WillOnce(DoAll(Invoke(&slave, &MockSlave::unmocked_removeFramework),
+                    FutureSatisfy(&removeFramework)));
+
+  AWAIT_READY(killTask);
+  slave.unmocked__runTask(
+      future, frameworkInfo, frameworkId, master.get(), task);
+
+  AWAIT_READY(removeFramework);
+
+  AWAIT_READY(status);
+  EXPECT_EQ(TASK_KILLED, status.get().state());
+
+  driver.stop();
+  driver.join();
+
+  process::terminate(slave);
+  process::wait(slave);
+
+  Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}