You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/08/13 21:50:06 UTC
[22/23] git commit: Fixed slave to remove a queued task when the task
is terminated.
Fixed slave to remove a queued task when the task is terminated.
Review: https://reviews.apache.org/r/13207
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/88b589cc
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/88b589cc
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/88b589cc
Branch: refs/heads/vinod/0.13.0
Commit: 88b589cc8a3cbacfc764e1629a4bff0622238d4b
Parents: 9378462
Author: Vinod Kone <vi...@twitter.com>
Authored: Thu Aug 1 17:03:04 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Mon Aug 5 11:26:33 2013 -0700
----------------------------------------------------------------------
src/slave/slave.cpp | 1 +
src/tests/master_tests.cpp | 79 +++++++++++++++++++++++++++++++++++++++++
2 files changed, 80 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/88b589cc/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index c40a9ac..20d76e2 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2954,6 +2954,7 @@ void Executor::terminateTask(
if (queuedTasks.contains(taskId)) {
task = new Task(
protobuf::createTask(queuedTasks[taskId], state, id, frameworkId));
+ queuedTasks.erase(taskId);
} else if (launchedTasks.contains(taskId)) {
// Update the resources if it's been launched.
task = launchedTasks[taskId];
http://git-wip-us.apache.org/repos/asf/mesos/blob/88b589cc/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 5ac4d5f..c503842 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -40,6 +40,7 @@
#include "master/master.hpp"
#include "slave/constants.hpp"
+#include "slave/gc.hpp"
#include "slave/flags.hpp"
#include "slave/process_isolator.hpp"
#include "slave/slave.hpp"
@@ -53,6 +54,7 @@ using namespace mesos::internal::tests;
using mesos::internal::master::Master;
+using mesos::internal::slave::GarbageCollectorProcess;
using mesos::internal::slave::Isolator;
using mesos::internal::slave::ProcessIsolator;
using mesos::internal::slave::Slave;
@@ -755,6 +757,83 @@ TEST_F(MasterTest, ShutdownUnregisteredExecutor)
}
+// This test verifies that when an executor terminates before
+// registering with slave, it is properly cleaned up.
+TEST_F(MasterTest, RemoveUnregisteredTerminatedExecutor)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+ TestingIsolator isolator(&exec);
+
+ Try<PID<Slave> > slave = StartSlave(&isolator);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get());
+
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .Times(1);
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ TaskInfo task;
+ task.set_name("");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
+ task.mutable_resources()->MergeFrom(offers.get()[0].resources());
+ task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+
+ vector<TaskInfo> tasks;
+ tasks.push_back(task);
+
+ // Drop the registration message from the executor to the slave.
+ Future<process::Message> registerExecutorMessage =
+ DROP_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _);
+
+ driver.launchTasks(offers.get()[0].id(), tasks);
+
+ AWAIT_READY(registerExecutorMessage);
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status));
+
+ Future<Nothing> schedule =
+ FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule);
+
+ // Now kill the executor.
+ dispatch(isolator,
+ &Isolator::killExecutor,
+ offers.get()[0].framework_id(),
+ DEFAULT_EXECUTOR_ID);
+
+ AWAIT_READY(status);
+ EXPECT_EQ(TASK_LOST, status.get().state());
+
+ // We use 'gc.schedule' as a signal for the executor being cleaned
+ // up by the slave.
+ AWAIT_READY(schedule);
+
+ EXPECT_CALL(exec, shutdown(_))
+ .Times(AtMost(1));
+
+ driver.stop();
+ driver.join();
+
+ Shutdown(); // Must shutdown before 'isolator' gets deallocated.
+}
+
+
TEST_F(MasterTest, MasterInfo)
{
Try<PID<Master> > master = StartMaster();