You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2014/04/23 20:44:43 UTC

git commit: Fixed MesosContainerizer to correctly update resources after recovery.

Repository: mesos
Updated Branches:
  refs/heads/master 1f0c6ee15 -> 7228a4d9d


Fixed MesosContainerizer to correctly update resources after recovery.

Review: https://reviews.apache.org/r/20505


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7228a4d9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7228a4d9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7228a4d9

Branch: refs/heads/master
Commit: 7228a4d9d6cd0ec3234f9985d605eb0a81155508
Parents: 1f0c6ee
Author: Ian Downes <ia...@gmail.com>
Authored: Wed Apr 23 11:42:15 2014 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Wed Apr 23 11:42:15 2014 -0700

----------------------------------------------------------------------
 src/slave/containerizer/mesos_containerizer.cpp |  5 +-
 src/slave/slave.cpp                             | 52 +++++------
 src/tests/slave_recovery_tests.cpp              | 94 ++++++++++++++++++++
 3 files changed, 124 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/7228a4d9/src/slave/containerizer/mesos_containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos_containerizer.cpp b/src/slave/containerizer/mesos_containerizer.cpp
index 4a5dfa7..20e8d2e 100644
--- a/src/slave/containerizer/mesos_containerizer.cpp
+++ b/src/slave/containerizer/mesos_containerizer.cpp
@@ -739,7 +739,10 @@ Future<Nothing> MesosContainerizerProcess::update(
     const ContainerID& containerId,
     const Resources& _resources)
 {
-  if (!resources.contains(containerId)) {
+  // The resources hashmap won't initially contain the container's resources
+  // after recovery so we must check the promises hashmap (which is set during
+  // recovery).
+  if (!promises.contains(containerId)) {
     return Failure("Unknown container: " + stringify(containerId));
   }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/7228a4d9/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index e710995..b673fd6 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1588,6 +1588,21 @@ void Slave::registerExecutor(
 }
 
 
+void _monitor(
+    const Future<Nothing>& monitor,
+    const FrameworkID& frameworkId,
+    const ExecutorID& executorId,
+    const ContainerID& containerId)
+{
+  if (!monitor.isReady()) {
+    LOG(ERROR) << "Failed to monitor container '" << containerId
+               << "' for executor '" << executorId
+               << "' of framework '" << frameworkId
+               << ":" << (monitor.isFailed() ? monitor.failure() : "discarded");
+  }
+}
+
+
 void Slave::reregisterExecutor(
     const UPID& from,
     const FrameworkID& frameworkId,
@@ -1669,6 +1684,17 @@ void Slave::reregisterExecutor(
       CHECK_SOME(executor->resources);
       containerizer->update(executor->containerId, executor->resources.get());
 
+      // Monitor the executor.
+      monitor.start(
+          executor->containerId,
+          executor->info,
+          flags.resource_monitoring_interval)
+        .onAny(lambda::bind(_monitor,
+                            lambda::_1,
+                            framework->id,
+                            executor->id,
+                            executor->containerId));
+
       hashmap<TaskID, TaskInfo> unackedTasks;
       foreach (const TaskInfo& task, tasks) {
         unackedTasks[task.task_id()] = task;
@@ -2047,20 +2073,6 @@ ExecutorInfo Slave::getExecutorInfo(
 }
 
 
-void _monitor(
-    const Future<Nothing>& monitor,
-    const FrameworkID& frameworkId,
-    const ExecutorID& executorId,
-    const ContainerID& containerId)
-{
-  if (!monitor.isReady()) {
-    LOG(ERROR) << "Failed to monitor container '" << containerId
-               << "' for executor '" << executorId
-               << "' of framework '" << frameworkId
-               << ":" << (monitor.isFailed() ? monitor.failure() : "discarded");
-  }
-}
-
 void Slave::executorLaunched(
     const FrameworkID& frameworkId,
     const ExecutorID& executorId,
@@ -2707,17 +2719,6 @@ Future<Nothing> Slave::_recover()
 {
   foreachvalue (Framework* framework, frameworks) {
     foreachvalue (Executor* executor, framework->executors) {
-      // Monitor the executor.
-      monitor.start(
-          executor->containerId,
-          executor->info,
-          flags.resource_monitoring_interval)
-        .onAny(lambda::bind(_monitor,
-                            lambda::_1,
-                            framework->id,
-                            executor->id,
-                            executor->containerId));
-
       // Set up callback for executor termination.
       containerizer->wait(executor->containerId)
         .onAny(defer(self(),
@@ -2726,7 +2727,6 @@ Future<Nothing> Slave::_recover()
                      executor->id,
                      lambda::_1));
 
-
       if (flags.recover == "reconnect") {
         if (executor->pid) {
           LOG(INFO) << "Sending reconnect request to executor " << executor->id

http://git-wip-us.apache.org/repos/asf/mesos/blob/7228a4d9/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index 72b6d42..3a062aa 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -2976,3 +2976,97 @@ TYPED_TEST(SlaveRecoveryTest, MultipleSlaves)
   delete containerizer3.get();
   delete containerizer4.get();
 }
+
+
+TYPED_TEST(SlaveRecoveryTest, ResourceStatistics)
+{
+  Try<PID<Master> > master = this->StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = this->CreateSlaveFlags();
+
+  Try<Containerizer*> containerizer1 = Containerizer::create(flags, true);
+  ASSERT_SOME(containerizer1);
+
+  Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+
+  // Enable checkpointing for the framework.
+  FrameworkInfo frameworkInfo;
+  frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO);
+  frameworkInfo.set_checkpoint(true);
+
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(_, _, _));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());      // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  TaskInfo task = createTask(offers.get()[0], "sleep 1000");
+  vector<TaskInfo> tasks;
+  tasks.push_back(task); // Long-running task.
+
+  // Message expectations.
+  Future<Message> registerExecutor =
+    FUTURE_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _);
+
+  driver.launchTasks(offers.get()[0].id(), tasks);
+
+  AWAIT_READY(registerExecutor);
+
+  this->Stop(slave.get());
+  delete containerizer1.get();
+
+  // Set up so we can wait until the new slave updates the container's
+  // resources (this occurs after the executor has re-registered).
+  // TODO(idownes): This assumes the containerizer is a MesosContainerizer.
+  Future<Nothing> update = FUTURE_DISPATCH(_, &MesosContainerizerProcess::update);
+
+  // Restart the slave (use same flags) with a new containerizer.
+  Try<Containerizer*> containerizer2 = Containerizer::create(flags, true);
+  ASSERT_SOME(containerizer2);
+
+  slave = this->StartSlave(containerizer2.get(), flags);
+  ASSERT_SOME(slave);
+
+  // Wait until the containerizer is updated.
+  AWAIT_READY(update);
+
+  Future<hashset<ContainerID> > containers = containerizer2.get()->containers();
+  AWAIT_READY(containers);
+  ASSERT_EQ(1u, containers.get().size());
+
+  ContainerID containerId = *(containers.get().begin());
+
+  Future<ResourceStatistics> usage = containerizer2.get()->usage(containerId);
+  AWAIT_READY(usage);
+
+  // Check the resource limits are set.
+  EXPECT_TRUE(usage.get().has_cpus_limit());
+  EXPECT_TRUE(usage.get().has_mem_limit_bytes());
+
+  Future<slave::Containerizer::Termination> wait =
+    containerizer2.get()->wait(containerId);
+
+  containerizer2.get()->destroy(containerId);
+
+  AWAIT_READY(wait);
+
+  driver.stop();
+  driver.join();
+
+  this->Shutdown();
+
+  delete containerizer2.get();
+}