You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/08/13 21:50:06 UTC

[22/23] git commit: Fixed slave to remove a queued task when the task is terminated.

Fixed slave to remove a queued task when the task is terminated.

Review: https://reviews.apache.org/r/13207


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/88b589cc
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/88b589cc
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/88b589cc

Branch: refs/heads/vinod/0.13.0
Commit: 88b589cc8a3cbacfc764e1629a4bff0622238d4b
Parents: 9378462
Author: Vinod Kone <vi...@twitter.com>
Authored: Thu Aug 1 17:03:04 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Mon Aug 5 11:26:33 2013 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp        |  1 +
 src/tests/master_tests.cpp | 79 +++++++++++++++++++++++++++++++++++++++++
 2 files changed, 80 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/88b589cc/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index c40a9ac..20d76e2 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2954,6 +2954,7 @@ void Executor::terminateTask(
   if (queuedTasks.contains(taskId)) {
     task = new Task(
         protobuf::createTask(queuedTasks[taskId], state, id, frameworkId));
+    queuedTasks.erase(taskId);
   } else if (launchedTasks.contains(taskId)) {
     // Update the resources if it's been launched.
     task = launchedTasks[taskId];

http://git-wip-us.apache.org/repos/asf/mesos/blob/88b589cc/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 5ac4d5f..c503842 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -40,6 +40,7 @@
 #include "master/master.hpp"
 
 #include "slave/constants.hpp"
+#include "slave/gc.hpp"
 #include "slave/flags.hpp"
 #include "slave/process_isolator.hpp"
 #include "slave/slave.hpp"
@@ -53,6 +54,7 @@ using namespace mesos::internal::tests;
 
 using mesos::internal::master::Master;
 
+using mesos::internal::slave::GarbageCollectorProcess;
 using mesos::internal::slave::Isolator;
 using mesos::internal::slave::ProcessIsolator;
 using mesos::internal::slave::Slave;
@@ -755,6 +757,83 @@ TEST_F(MasterTest, ShutdownUnregisteredExecutor)
 }
 
 
+// This test verifies that when an executor terminates before
+// registering with slave, it is properly cleaned up.
+TEST_F(MasterTest, RemoveUnregisteredTerminatedExecutor)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  TestingIsolator isolator(&exec);
+
+  Try<PID<Slave> > slave = StartSlave(&isolator);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get());
+
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .Times(1);
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
+  task.mutable_resources()->MergeFrom(offers.get()[0].resources());
+  task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  // Drop the registration message from the executor to the slave.
+  Future<process::Message> registerExecutorMessage =
+    DROP_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _);
+
+  driver.launchTasks(offers.get()[0].id(), tasks);
+
+  AWAIT_READY(registerExecutorMessage);
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  Future<Nothing> schedule =
+    FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule);
+
+  // Now kill the executor.
+  dispatch(isolator,
+           &Isolator::killExecutor,
+           offers.get()[0].framework_id(),
+           DEFAULT_EXECUTOR_ID);
+
+  AWAIT_READY(status);
+  EXPECT_EQ(TASK_LOST, status.get().state());
+
+  // We use 'gc.schedule' as a signal for the executor being cleaned
+  // up by the slave.
+  AWAIT_READY(schedule);
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown(); // Must shutdown before 'isolator' gets deallocated.
+}
+
+
 TEST_F(MasterTest, MasterInfo)
 {
   Try<PID<Master> > master = StartMaster();