You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2014/04/23 20:44:43 UTC
git commit: Fixed MesosContainerizer to correctly update resources
after recovery.
Repository: mesos
Updated Branches:
refs/heads/master 1f0c6ee15 -> 7228a4d9d
Fixed MesosContainerizer to correctly update resources after recovery.
Review: https://reviews.apache.org/r/20505
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7228a4d9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7228a4d9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7228a4d9
Branch: refs/heads/master
Commit: 7228a4d9d6cd0ec3234f9985d605eb0a81155508
Parents: 1f0c6ee
Author: Ian Downes <ia...@gmail.com>
Authored: Wed Apr 23 11:42:15 2014 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Wed Apr 23 11:42:15 2014 -0700
----------------------------------------------------------------------
src/slave/containerizer/mesos_containerizer.cpp | 5 +-
src/slave/slave.cpp | 52 +++++------
src/tests/slave_recovery_tests.cpp | 94 ++++++++++++++++++++
3 files changed, 124 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/7228a4d9/src/slave/containerizer/mesos_containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos_containerizer.cpp b/src/slave/containerizer/mesos_containerizer.cpp
index 4a5dfa7..20e8d2e 100644
--- a/src/slave/containerizer/mesos_containerizer.cpp
+++ b/src/slave/containerizer/mesos_containerizer.cpp
@@ -739,7 +739,10 @@ Future<Nothing> MesosContainerizerProcess::update(
const ContainerID& containerId,
const Resources& _resources)
{
- if (!resources.contains(containerId)) {
+ // The resources hashmap won't initially contain the container's resources
+ // after recovery so we must check the promises hashmap (which is set during
+ // recovery).
+ if (!promises.contains(containerId)) {
return Failure("Unknown container: " + stringify(containerId));
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/7228a4d9/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index e710995..b673fd6 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1588,6 +1588,21 @@ void Slave::registerExecutor(
}
+void _monitor(
+ const Future<Nothing>& monitor,
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ const ContainerID& containerId)
+{
+ if (!monitor.isReady()) {
+ LOG(ERROR) << "Failed to monitor container '" << containerId
+ << "' for executor '" << executorId
+ << "' of framework '" << frameworkId
+ << ":" << (monitor.isFailed() ? monitor.failure() : "discarded");
+ }
+}
+
+
void Slave::reregisterExecutor(
const UPID& from,
const FrameworkID& frameworkId,
@@ -1669,6 +1684,17 @@ void Slave::reregisterExecutor(
CHECK_SOME(executor->resources);
containerizer->update(executor->containerId, executor->resources.get());
+ // Monitor the executor.
+ monitor.start(
+ executor->containerId,
+ executor->info,
+ flags.resource_monitoring_interval)
+ .onAny(lambda::bind(_monitor,
+ lambda::_1,
+ framework->id,
+ executor->id,
+ executor->containerId));
+
hashmap<TaskID, TaskInfo> unackedTasks;
foreach (const TaskInfo& task, tasks) {
unackedTasks[task.task_id()] = task;
@@ -2047,20 +2073,6 @@ ExecutorInfo Slave::getExecutorInfo(
}
-void _monitor(
- const Future<Nothing>& monitor,
- const FrameworkID& frameworkId,
- const ExecutorID& executorId,
- const ContainerID& containerId)
-{
- if (!monitor.isReady()) {
- LOG(ERROR) << "Failed to monitor container '" << containerId
- << "' for executor '" << executorId
- << "' of framework '" << frameworkId
- << ":" << (monitor.isFailed() ? monitor.failure() : "discarded");
- }
-}
-
void Slave::executorLaunched(
const FrameworkID& frameworkId,
const ExecutorID& executorId,
@@ -2707,17 +2719,6 @@ Future<Nothing> Slave::_recover()
{
foreachvalue (Framework* framework, frameworks) {
foreachvalue (Executor* executor, framework->executors) {
- // Monitor the executor.
- monitor.start(
- executor->containerId,
- executor->info,
- flags.resource_monitoring_interval)
- .onAny(lambda::bind(_monitor,
- lambda::_1,
- framework->id,
- executor->id,
- executor->containerId));
-
// Set up callback for executor termination.
containerizer->wait(executor->containerId)
.onAny(defer(self(),
@@ -2726,7 +2727,6 @@ Future<Nothing> Slave::_recover()
executor->id,
lambda::_1));
-
if (flags.recover == "reconnect") {
if (executor->pid) {
LOG(INFO) << "Sending reconnect request to executor " << executor->id
http://git-wip-us.apache.org/repos/asf/mesos/blob/7228a4d9/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index 72b6d42..3a062aa 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -2976,3 +2976,97 @@ TYPED_TEST(SlaveRecoveryTest, MultipleSlaves)
delete containerizer3.get();
delete containerizer4.get();
}
+
+
+TYPED_TEST(SlaveRecoveryTest, ResourceStatistics)
+{
+ Try<PID<Master> > master = this->StartMaster();
+ ASSERT_SOME(master);
+
+ slave::Flags flags = this->CreateSlaveFlags();
+
+ Try<Containerizer*> containerizer1 = Containerizer::create(flags, true);
+ ASSERT_SOME(containerizer1);
+
+ Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+
+ // Enable checkpointing for the framework.
+ FrameworkInfo frameworkInfo;
+ frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO);
+ frameworkInfo.set_checkpoint(true);
+
+ MesosSchedulerDriver driver(
+ &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(_, _, _));
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(_, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ TaskInfo task = createTask(offers.get()[0], "sleep 1000");
+ vector<TaskInfo> tasks;
+ tasks.push_back(task); // Long-running task.
+
+ // Message expectations.
+ Future<Message> registerExecutor =
+ FUTURE_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _);
+
+ driver.launchTasks(offers.get()[0].id(), tasks);
+
+ AWAIT_READY(registerExecutor);
+
+ this->Stop(slave.get());
+ delete containerizer1.get();
+
+ // Set up so we can wait until the new slave updates the container's
+ // resources (this occurs after the executor has re-registered).
+ // TODO(idownes): This assumes the containerizer is a MesosContainerizer.
+ Future<Nothing> update = FUTURE_DISPATCH(_, &MesosContainerizerProcess::update);
+
+ // Restart the slave (use same flags) with a new containerizer.
+ Try<Containerizer*> containerizer2 = Containerizer::create(flags, true);
+ ASSERT_SOME(containerizer2);
+
+ slave = this->StartSlave(containerizer2.get(), flags);
+ ASSERT_SOME(slave);
+
+ // Wait until the containerizer is updated.
+ AWAIT_READY(update);
+
+ Future<hashset<ContainerID> > containers = containerizer2.get()->containers();
+ AWAIT_READY(containers);
+ ASSERT_EQ(1u, containers.get().size());
+
+ ContainerID containerId = *(containers.get().begin());
+
+ Future<ResourceStatistics> usage = containerizer2.get()->usage(containerId);
+ AWAIT_READY(usage);
+
+ // Check the resource limits are set.
+ EXPECT_TRUE(usage.get().has_cpus_limit());
+ EXPECT_TRUE(usage.get().has_mem_limit_bytes());
+
+ Future<slave::Containerizer::Termination> wait =
+ containerizer2.get()->wait(containerId);
+
+ containerizer2.get()->destroy(containerId);
+
+ AWAIT_READY(wait);
+
+ driver.stop();
+ driver.join();
+
+ this->Shutdown();
+
+ delete containerizer2.get();
+}