You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/08/05 00:09:51 UTC
[30/43] git commit: Implemented DockerContainerizer::update.
Implemented DockerContainerizer::update.
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a5d683b6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a5d683b6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a5d683b6
Branch: refs/heads/master
Commit: a5d683b6c5d4a7c8cc9608f6e57cb9cf7f172ba4
Parents: 81782c0
Author: Timothy Chen <tn...@gmail.com>
Authored: Mon Jun 30 15:33:00 2014 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Mon Aug 4 15:08:17 2014 -0700
----------------------------------------------------------------------
src/linux/cgroups.cpp | 19 ++++
src/linux/cgroups.hpp | 5 +
src/slave/containerizer/docker.cpp | 144 +++++++++++++++++++++++++-
src/slave/containerizer/docker.hpp | 2 +
src/tests/docker_containerizer_tests.cpp | 120 +++++++++++++++++++++
5 files changed, 286 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/a5d683b6/src/linux/cgroups.cpp
----------------------------------------------------------------------
diff --git a/src/linux/cgroups.cpp b/src/linux/cgroups.cpp
index ccb86cf..39a4874 100644
--- a/src/linux/cgroups.cpp
+++ b/src/linux/cgroups.cpp
@@ -1846,6 +1846,25 @@ Try<Nothing> shares(
}
+Try<uint64_t> shares(
+ const string& hierarchy,
+ const string& cgroup)
+{
+ Try<string> read = cgroups::read(hierarchy, cgroup, "cpu.shares");
+
+ if (read.isError()) {
+ return Error(read.error());
+ }
+
+ uint64_t shares;
+ std::istringstream ss(read.get());
+
+ ss >> shares;
+
+ return shares;
+}
+
+
Try<Nothing> cfs_period_us(
const string& hierarchy,
const string& cgroup,
http://git-wip-us.apache.org/repos/asf/mesos/blob/a5d683b6/src/linux/cgroups.hpp
----------------------------------------------------------------------
diff --git a/src/linux/cgroups.hpp b/src/linux/cgroups.hpp
index c571e91..9dfba6e 100644
--- a/src/linux/cgroups.hpp
+++ b/src/linux/cgroups.hpp
@@ -392,6 +392,11 @@ Try<Nothing> shares(
const std::string& cgroup,
uint64_t shares);
+// Returns the cpu shares from cpu.shares.
+Try<uint64_t> shares(
+ const std::string& hierarchy,
+ const std::string& cgroup);
+
// Returns the cpu shares from cpu.shares.
Try<uint64_t> shares(
http://git-wip-us.apache.org/repos/asf/mesos/blob/a5d683b6/src/slave/containerizer/docker.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/docker.cpp b/src/slave/containerizer/docker.cpp
index 7cd2d7d..5a68d94 100644
--- a/src/slave/containerizer/docker.cpp
+++ b/src/slave/containerizer/docker.cpp
@@ -30,12 +30,19 @@
#include "docker/docker.hpp"
+#ifdef __linux__
+#include "linux/cgroups.hpp"
+#endif // __linux__
+
#include "slave/paths.hpp"
#include "slave/slave.hpp"
#include "slave/containerizer/containerizer.hpp"
#include "slave/containerizer/docker.hpp"
+#include "slave/containerizer/isolators/cgroups/cpushare.hpp"
+#include "slave/containerizer/isolators/cgroups/mem.hpp"
+
#include "usage/usage.hpp"
@@ -134,6 +141,11 @@ private:
const bool& killed,
const Future<Option<int > >& status);
+ process::Future<Nothing> _update(
+ const ContainerID& containerId,
+ const Resources& resources,
+ const Future<Docker::Container>& future);
+
Future<ResourceStatistics> _usage(
const ContainerID& containerId,
const Docker::Container& container);
@@ -169,6 +181,30 @@ private:
};
+Try<Nothing> DockerContainerizer::prepareCgroups(const Flags& flags)
+{
+#ifdef __linux__
+ std::vector<string> subsystems;
+ subsystems.push_back("cpu");
+ subsystems.push_back("cpuacct");
+ subsystems.push_back("memory");
+
+ foreach (const string& subsystem, subsystems) {
+ // We're assuming docker is under cgroup directory "docker".
+ Try<string> hierarchy =
+ cgroups::prepare(flags.cgroups_hierarchy, subsystem, "docker");
+
+ if (hierarchy.isError()) {
+ return Error(
+ "Failed to prepare cgroup hierarchy " + flags.cgroups_hierarchy +
+ " subsystem '" + subsystem + "' for Docker: " + hierarchy.error());
+ }
+ }
+#endif // __linux__
+ return Nothing();
+}
+
+
Try<DockerContainerizer*> DockerContainerizer::create(
const Flags& flags,
bool local)
@@ -179,6 +215,11 @@ Try<DockerContainerizer*> DockerContainerizer::create(
return Error(validation.error());
}
+ Try<Nothing> prepare = prepareCgroups(flags);
+ if (prepare.isError()) {
+ return Error(prepare.error());
+ }
+
return new DockerContainerizer(flags, local, docker);
}
@@ -620,11 +661,106 @@ Future<bool> DockerContainerizerProcess::_launch(
Future<Nothing> DockerContainerizerProcess::update(
const ContainerID& containerId,
- const Resources& resources)
+ const Resources& _resources)
+{
+ if (!promises.contains(containerId)) {
+ LOG(WARNING)
+ << "Ignoring updating unknown container: "
+ << containerId.value();
+ return Nothing();
+ }
+
+#ifdef __linux__
+ if (!_resources.cpus().isSome() && !_resources.mem().isSome()) {
+ LOG(WARNING) << "Ignoring update as no supported resources are present";
+ return Nothing();
+ }
+
+ // Store the resources for usage()
+ resources.put(containerId, _resources);
+
+ return docker.inspect(DOCKER_NAME_PREFIX + stringify(containerId))
+ .then(defer(self(), &Self::_update, containerId, _resources, lambda::_1));
+#else
+ return Nothing();
+#endif // __linux__
+}
+
+
+Future<Nothing> DockerContainerizerProcess::_update(
+ const ContainerID& containerId,
+ const Resources& _resources,
+ const Future<Docker::Container>& future)
{
- // TODO(benh): Right now we're only launching tasks so we don't
- // expect the containers to be resized. This will need to get
- // implemented to support executors.
+#ifdef __linux__
+ const string& id = path::join("docker", future.get().id());
+
+ // Update CPU shares.
+ if (_resources.cpus().isSome()) {
+ double cpuShares = _resources.cpus().get();
+
+ uint64_t shares =
+ std::max((uint64_t) (CPU_SHARES_PER_CPU * cpuShares), MIN_CPU_SHARES);
+
+ Try<Nothing> write =
+ cgroups::cpu::shares(
+ path::join(flags.cgroups_hierarchy, "cpu"), id, shares);
+
+ if (write.isError()) {
+ return Failure("Failed to update 'cpu.shares': " + write.error());
+ }
+
+ LOG(INFO)
+ << "Updated 'cpu.shares' to " << shares
+ << " for container " << containerId;
+ }
+
+ // Update Memory.
+ if (_resources.mem().isSome()) {
+ Bytes mem = _resources.mem().get();
+ Bytes limit = std::max(mem, MIN_MEMORY);
+
+ std::string memHierarchy =
+ path::join(flags.cgroups_hierarchy, "memory");
+
+ // Always set the soft limit.
+ Try<Nothing> write =
+ cgroups::memory::soft_limit_in_bytes(memHierarchy, id, limit);
+
+ if (write.isError()) {
+ return Failure("Failed to set 'memory.soft_limit_in_bytes': " +
+ write.error());
+ }
+
+ LOG(INFO)
+ << "Updated 'memory.soft_limit_in_bytes' to " << limit
+ << " for container " << containerId;
+
+ // Read the existing limit.
+ Try<Bytes> currentLimit =
+ cgroups::memory::limit_in_bytes(memHierarchy, id);
+
+ if (currentLimit.isError()) {
+ return Failure("Failed to read 'memory.limit_in_bytes': " +
+ currentLimit.error());
+ }
+
+ // Only update if new limit is higher.
+ if (limit > currentLimit.get()) {
+ write = cgroups::memory::limit_in_bytes(memHierarchy, id, limit);
+
+ if (write.isError()) {
+ return Failure("Failed to set 'memory.limit_in_bytes': " +
+ write.error());
+ }
+
+ LOG(INFO)
+ << "Updated 'memory.limit_in_bytes' to " << limit
+ << " for container " << containerId;
+ }
+ }
+#endif // __linux__
+
return Nothing();
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/a5d683b6/src/slave/containerizer/docker.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/docker.hpp b/src/slave/containerizer/docker.hpp
index 1a5d1c2..f4eb0ff 100644
--- a/src/slave/containerizer/docker.hpp
+++ b/src/slave/containerizer/docker.hpp
@@ -45,6 +45,8 @@ public:
const Flags& flags,
bool local);
+ static Try<Nothing> prepareCgroups(const Flags& flags);
+
DockerContainerizer(
const Flags& flags,
bool local,
http://git-wip-us.apache.org/repos/asf/mesos/blob/a5d683b6/src/tests/docker_containerizer_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/docker_containerizer_tests.cpp b/src/tests/docker_containerizer_tests.cpp
index b0b8b39..cd1c88c 100644
--- a/src/tests/docker_containerizer_tests.cpp
+++ b/src/tests/docker_containerizer_tests.cpp
@@ -22,6 +22,8 @@
#include <process/future.hpp>
#include <process/subprocess.hpp>
+#include "linux/cgroups.hpp"
+
#include "tests/flags.hpp"
#include "tests/mesos.hpp"
@@ -62,6 +64,7 @@ public:
const Docker& docker)
: DockerContainerizer(flags, local, docker)
{
+ DockerContainerizer::prepareCgroups(flags);
EXPECT_CALL(*this, launch(_, _, _, _, _, _, _, _))
.WillRepeatedly(Invoke(this, &MockDockerContainerizer::_launch));
}
@@ -317,6 +320,123 @@ TEST_F(DockerContainerizerTest, DOCKER_Usage)
}
+#ifdef __linux__
+TEST_F(DockerContainerizerTest, DOCKER_Update)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ slave::Flags flags = CreateSlaveFlags();
+
+ Docker docker(tests::flags.docker);
+
+ MockDockerContainerizer dockerContainerizer(flags, true, docker);
+
+ Try<PID<Slave> > slave = StartSlave(&dockerContainerizer);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(frameworkId);
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ const Offer& offer = offers.get()[0];
+
+ TaskInfo task;
+ task.set_name("");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->CopyFrom(offer.slave_id());
+ task.mutable_resources()->CopyFrom(offer.resources());
+
+ CommandInfo command;
+ CommandInfo::ContainerInfo* containerInfo = command.mutable_container();
+ containerInfo->set_image("docker://busybox");
+ command.set_value("sleep 180");
+
+ task.mutable_command()->CopyFrom(command);
+
+ Future<TaskStatus> statusRunning;
+
+ vector<TaskInfo> tasks;
+ tasks.push_back(task);
+
+ Future<ContainerID> containerId;
+ EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
+ .WillOnce(DoAll(FutureArg<0>(&containerId),
+ Invoke(&dockerContainerizer,
+ &MockDockerContainerizer::_launch)));
+
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusRunning))
+ .WillRepeatedly(DoDefault());
+
+ driver.launchTasks(offers.get()[0].id(), tasks);
+
+ AWAIT_READY(containerId);
+
+ AWAIT_READY_FOR(statusRunning, Seconds(60));
+ EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+
+ string containerName = slave::DOCKER_NAME_PREFIX + containerId.get().value();
+ Future<Docker::Container> container = docker.inspect(containerName);
+
+ AWAIT_READY(container);
+
+ Try<Resources> newResources = Resources::parse("cpus:1;mem:128");
+
+ ASSERT_SOME(newResources);
+
+ Future<Nothing> update =
+ dockerContainerizer.update(containerId.get(), newResources.get());
+
+ AWAIT_READY(update);
+
+ string id = path::join("docker", container.get().id());
+
+ Try<Bytes> mem =
+ cgroups::memory::soft_limit_in_bytes(
+ path::join(flags.cgroups_hierarchy, "memory"), id);
+ ASSERT_SOME(mem);
+
+ Try<uint64_t> cpu =
+ cgroups::cpu::shares(
+ path::join(flags.cgroups_hierarchy, "cpu"), id);
+
+ ASSERT_SOME(cpu);
+
+ EXPECT_EQ(1024, cpu.get());
+ EXPECT_EQ(128, mem.get().megabytes());
+
+ Future<containerizer::Termination> termination =
+ dockerContainerizer.wait(containerId.get());
+
+ dockerContainerizer.destroy(containerId.get());
+
+ AWAIT_READY(termination);
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+#endif //__linux__
+
+
TEST_F(DockerContainerizerTest, DOCKER_Recover)
{
slave::Flags flags = CreateSlaveFlags();