You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/08/05 00:09:51 UTC

[30/43] git commit: Implemented DockerContainerizer::update.

Implemented DockerContainerizer::update.


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a5d683b6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a5d683b6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a5d683b6

Branch: refs/heads/master
Commit: a5d683b6c5d4a7c8cc9608f6e57cb9cf7f172ba4
Parents: 81782c0
Author: Timothy Chen <tn...@gmail.com>
Authored: Mon Jun 30 15:33:00 2014 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Mon Aug 4 15:08:17 2014 -0700

----------------------------------------------------------------------
 src/linux/cgroups.cpp                    |  19 ++++
 src/linux/cgroups.hpp                    |   5 +
 src/slave/containerizer/docker.cpp       | 144 +++++++++++++++++++++++++-
 src/slave/containerizer/docker.hpp       |   2 +
 src/tests/docker_containerizer_tests.cpp | 120 +++++++++++++++++++++
 5 files changed, 286 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a5d683b6/src/linux/cgroups.cpp
----------------------------------------------------------------------
diff --git a/src/linux/cgroups.cpp b/src/linux/cgroups.cpp
index ccb86cf..39a4874 100644
--- a/src/linux/cgroups.cpp
+++ b/src/linux/cgroups.cpp
@@ -1846,6 +1846,25 @@ Try<Nothing> shares(
 }
 
 
+Try<uint64_t> shares(
+    const string& hierarchy,
+    const string& cgroup)
+{
+  Try<string> read = cgroups::read(hierarchy, cgroup, "cpu.shares");
+
+  if (read.isError()) {
+    return Error(read.error());
+  }
+
+  uint64_t shares;
+  std::istringstream ss(read.get());
+
+  ss >> shares;
+
+  return shares;
+}
+
+
 Try<Nothing> cfs_period_us(
     const string& hierarchy,
     const string& cgroup,

http://git-wip-us.apache.org/repos/asf/mesos/blob/a5d683b6/src/linux/cgroups.hpp
----------------------------------------------------------------------
diff --git a/src/linux/cgroups.hpp b/src/linux/cgroups.hpp
index c571e91..9dfba6e 100644
--- a/src/linux/cgroups.hpp
+++ b/src/linux/cgroups.hpp
@@ -392,6 +392,11 @@ Try<Nothing> shares(
     const std::string& cgroup,
     uint64_t shares);
 
+// Returns the cpu shares from cpu.shares.
+Try<uint64_t> shares(
+    const std::string& hierarchy,
+    const std::string& cgroup);
+
 
 // Returns the cpu shares from cpu.shares.
 Try<uint64_t> shares(

http://git-wip-us.apache.org/repos/asf/mesos/blob/a5d683b6/src/slave/containerizer/docker.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/docker.cpp b/src/slave/containerizer/docker.cpp
index 7cd2d7d..5a68d94 100644
--- a/src/slave/containerizer/docker.cpp
+++ b/src/slave/containerizer/docker.cpp
@@ -30,12 +30,19 @@
 
 #include "docker/docker.hpp"
 
+#ifdef __linux__
+#include "linux/cgroups.hpp"
+#endif // __linux__
+
 #include "slave/paths.hpp"
 #include "slave/slave.hpp"
 
 #include "slave/containerizer/containerizer.hpp"
 #include "slave/containerizer/docker.hpp"
 
+#include "slave/containerizer/isolators/cgroups/cpushare.hpp"
+#include "slave/containerizer/isolators/cgroups/mem.hpp"
+
 #include "usage/usage.hpp"
 
 
@@ -134,6 +141,11 @@ private:
       const bool& killed,
       const Future<Option<int > >& status);
 
+  process::Future<Nothing> _update(
+      const ContainerID& containerId,
+      const Resources& resources,
+      const Future<Docker::Container>& future);
+
   Future<ResourceStatistics> _usage(
     const ContainerID& containerId,
     const Docker::Container& container);
@@ -169,6 +181,30 @@ private:
 };
 
 
+Try<Nothing> DockerContainerizer::prepareCgroups(const Flags& flags)
+{
+#ifdef __linux__
+  std::vector<string> subsystems;
+  subsystems.push_back("cpu");
+  subsystems.push_back("cpuacct");
+  subsystems.push_back("memory");
+
+  foreach (const string& subsystem, subsystems) {
+    // We're assuming docker is under cgroup directory "docker".
+    Try<string> hierarchy =
+      cgroups::prepare(flags.cgroups_hierarchy, subsystem, "docker");
+
+    if (hierarchy.isError()) {
+      return Error(
+          "Failed to prepare cgroup hierarchy " + flags.cgroups_hierarchy +
+          " subsystem '" + subsystem + "' for Docker: " + hierarchy.error());
+    }
+  }
+#endif // __linux__
+  return Nothing();
+}
+
+
 Try<DockerContainerizer*> DockerContainerizer::create(
     const Flags& flags,
     bool local)
@@ -179,6 +215,11 @@ Try<DockerContainerizer*> DockerContainerizer::create(
     return Error(validation.error());
   }
 
+  Try<Nothing> prepare = prepareCgroups(flags);
+  if (prepare.isError()) {
+    return Error(prepare.error());
+  }
+
   return new DockerContainerizer(flags, local, docker);
 }
 
@@ -620,11 +661,106 @@ Future<bool> DockerContainerizerProcess::_launch(
 
 Future<Nothing> DockerContainerizerProcess::update(
     const ContainerID& containerId,
-    const Resources& resources)
+    const Resources& _resources)
+{
+  if (!promises.contains(containerId)) {
+    LOG(WARNING)
+      << "Ignoring updating unknown container: "
+      << containerId.value();
+    return Nothing();
+  }
+
+#ifdef __linux__
+  if (!_resources.cpus().isSome() && !_resources.mem().isSome()) {
+    LOG(WARNING) << "Ignoring update as no supported resources are present";
+    return Nothing();
+  }
+
+  // Store the resources for usage()
+  resources.put(containerId, _resources);
+
+  return docker.inspect(DOCKER_NAME_PREFIX + stringify(containerId))
+    .then(defer(self(), &Self::_update, containerId, _resources, lambda::_1));
+#else
+  return Nothing();
+#endif // __linux__
+}
+
+
+Future<Nothing> DockerContainerizerProcess::_update(
+    const ContainerID& containerId,
+    const Resources& _resources,
+    const Future<Docker::Container>& future)
 {
-  // TODO(benh): Right now we're only launching tasks so we don't
-  // expect the containers to be resized. This will need to get
-  // implemented to support executors.
+#ifdef __linux__
+  const string& id = path::join("docker", future.get().id());
+
+  // Update CPU shares.
+  if (_resources.cpus().isSome()) {
+    double cpuShares = _resources.cpus().get();
+
+    uint64_t shares =
+      std::max((uint64_t) (CPU_SHARES_PER_CPU * cpuShares), MIN_CPU_SHARES);
+
+    Try<Nothing> write =
+      cgroups::cpu::shares(
+          path::join(flags.cgroups_hierarchy, "cpu"), id, shares);
+
+    if (write.isError()) {
+      return Failure("Failed to update 'cpu.shares': " + write.error());
+    }
+
+    LOG(INFO)
+      << "Updated 'cpu.shares' to " << shares
+      << " for container " << containerId;
+  }
+
+  // Update Memory.
+  if (_resources.mem().isSome()) {
+    Bytes mem = _resources.mem().get();
+    Bytes limit = std::max(mem, MIN_MEMORY);
+
+    std::string memHierarchy =
+      path::join(flags.cgroups_hierarchy, "memory");
+
+    // Always set the soft limit.
+    Try<Nothing> write =
+      cgroups::memory::soft_limit_in_bytes(memHierarchy, id, limit);
+
+    if (write.isError()) {
+      return Failure("Failed to set 'memory.soft_limit_in_bytes': " +
+          write.error());
+    }
+
+    LOG(INFO)
+      << "Updated 'memory.soft_limit_in_bytes' to " << limit
+      << " for container " << containerId;
+
+    // Read the existing limit.
+    Try<Bytes> currentLimit =
+      cgroups::memory::limit_in_bytes(memHierarchy, id);
+
+    if (currentLimit.isError()) {
+      return Failure("Failed to read 'memory.limit_in_bytes': " +
+          currentLimit.error());
+    }
+
+    // Only update if new limit is higher.
+    if (limit > currentLimit.get()) {
+      write = cgroups::memory::limit_in_bytes(memHierarchy, id, limit);
+
+      if (write.isError()) {
+        return Failure("Failed to set 'memory.limit_in_bytes': " +
+            write.error());
+      }
+
+      LOG(INFO)
+        << "Updated 'memory.limit_in_bytes' to " << limit
+        << " for container " << containerId;
+    }
+  }
+#endif // __linux__
+
   return Nothing();
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/a5d683b6/src/slave/containerizer/docker.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/docker.hpp b/src/slave/containerizer/docker.hpp
index 1a5d1c2..f4eb0ff 100644
--- a/src/slave/containerizer/docker.hpp
+++ b/src/slave/containerizer/docker.hpp
@@ -45,6 +45,8 @@ public:
       const Flags& flags,
       bool local);
 
+  static Try<Nothing> prepareCgroups(const Flags& flags);
+
   DockerContainerizer(
       const Flags& flags,
       bool local,

http://git-wip-us.apache.org/repos/asf/mesos/blob/a5d683b6/src/tests/docker_containerizer_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/docker_containerizer_tests.cpp b/src/tests/docker_containerizer_tests.cpp
index b0b8b39..cd1c88c 100644
--- a/src/tests/docker_containerizer_tests.cpp
+++ b/src/tests/docker_containerizer_tests.cpp
@@ -22,6 +22,8 @@
 #include <process/future.hpp>
 #include <process/subprocess.hpp>
 
+#include "linux/cgroups.hpp"
+
 #include "tests/flags.hpp"
 #include "tests/mesos.hpp"
 
@@ -62,6 +64,7 @@ public:
       const Docker& docker)
     : DockerContainerizer(flags, local, docker)
   {
+    DockerContainerizer::prepareCgroups(flags);
     EXPECT_CALL(*this, launch(_, _, _, _, _, _, _, _))
       .WillRepeatedly(Invoke(this, &MockDockerContainerizer::_launch));
   }
@@ -317,6 +320,123 @@ TEST_F(DockerContainerizerTest, DOCKER_Usage)
 }
 
 
+#ifdef __linux__
+TEST_F(DockerContainerizerTest, DOCKER_Update)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = CreateSlaveFlags();
+
+  Docker docker(tests::flags.docker);
+
+  MockDockerContainerizer dockerContainerizer(flags, true, docker);
+
+  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+    &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  const Offer& offer = offers.get()[0];
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->CopyFrom(offer.slave_id());
+  task.mutable_resources()->CopyFrom(offer.resources());
+
+  CommandInfo command;
+  CommandInfo::ContainerInfo* containerInfo = command.mutable_container();
+  containerInfo->set_image("docker://busybox");
+  command.set_value("sleep 180");
+
+  task.mutable_command()->CopyFrom(command);
+
+  Future<TaskStatus> statusRunning;
+
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  Future<ContainerID> containerId;
+  EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
+    .WillOnce(DoAll(FutureArg<0>(&containerId),
+         Invoke(&dockerContainerizer,
+                &MockDockerContainerizer::_launch)));
+
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusRunning))
+    .WillRepeatedly(DoDefault());
+
+  driver.launchTasks(offers.get()[0].id(), tasks);
+
+  AWAIT_READY(containerId);
+
+  AWAIT_READY_FOR(statusRunning, Seconds(60));
+  EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+
+  string containerName = slave::DOCKER_NAME_PREFIX + containerId.get().value();
+  Future<Docker::Container> container = docker.inspect(containerName);
+
+  AWAIT_READY(container);
+
+  Try<Resources> newResources = Resources::parse("cpus:1;mem:128");
+
+  ASSERT_SOME(newResources);
+
+  Future<Nothing> update =
+    dockerContainerizer.update(containerId.get(), newResources.get());
+
+  AWAIT_READY(update);
+
+  string id = path::join("docker", container.get().id());
+
+  Try<Bytes> mem =
+    cgroups::memory::soft_limit_in_bytes(
+        path::join(flags.cgroups_hierarchy, "memory"), id);
+  ASSERT_SOME(mem);
+
+  Try<uint64_t> cpu =
+    cgroups::cpu::shares(
+        path::join(flags.cgroups_hierarchy, "cpu"), id);
+
+  ASSERT_SOME(cpu);
+
+  EXPECT_EQ(1024, cpu.get());
+  EXPECT_EQ(128, mem.get().megabytes());
+
+  Future<containerizer::Termination> termination =
+    dockerContainerizer.wait(containerId.get());
+
+  dockerContainerizer.destroy(containerId.get());
+
+  AWAIT_READY(termination);
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+#endif //__linux__
+
+
 TEST_F(DockerContainerizerTest, DOCKER_Recover)
 {
   slave::Flags flags = CreateSlaveFlags();