You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/08/05 00:09:46 UTC
[25/43] git commit: Refactored docker::usage() and docker::run().
Refactored docker::usage() and docker::run().
Support resources restriction in docker::run().
Support resources display in docker::usage().
Also fixed docker::usage() to check for destroying containers
before calling docker::inspect().
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ee998e41
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ee998e41
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ee998e41
Branch: refs/heads/master
Commit: ee998e410b6b37f26aa0f871daf6c6ad571da18c
Parents: 9c94cce
Author: Yifan Gu <gu...@gmail.com>
Authored: Sat Jun 28 16:54:10 2014 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Mon Aug 4 15:08:16 2014 -0700
----------------------------------------------------------------------
src/docker/docker.cpp | 37 +++++++++++++----
src/docker/docker.hpp | 6 ++-
src/slave/containerizer/docker.cpp | 42 ++++++++++++++-----
src/tests/docker_containerizer_tests.cpp | 60 +++++++++++++++++++--------
src/tests/docker_tests.cpp | 4 +-
5 files changed, 111 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/ee998e41/src/docker/docker.cpp
----------------------------------------------------------------------
diff --git a/src/docker/docker.cpp b/src/docker/docker.cpp
index fb7f0e9..8985f55 100644
--- a/src/docker/docker.cpp
+++ b/src/docker/docker.cpp
@@ -89,9 +89,11 @@ Option<pid_t> Docker::Container::pid() const
map<string, JSON::Value>::const_iterator entry =
value.as<JSON::Object>().values.find("Pid");
CHECK(entry != json.values.end());
- value = entry->second;
- CHECK(value.is<JSON::Number>());
- pid_t pid = pid_t(value.as<JSON::Number>().value);
+ // TODO(yifan) reload operator '=' to reuse the value variable above.
+ JSON::Value pidValue = entry->second;
+ CHECK(pidValue.is<JSON::Number>());
+
+ pid_t pid = pid_t(pidValue.as<JSON::Number>().value);
if (pid == 0) {
return None();
}
@@ -101,13 +103,33 @@ Option<pid_t> Docker::Container::pid() const
Future<Option<int> > Docker::run(
const string& image,
const string& command,
- const string& name) const
+ const string& name,
+ const mesos::Resources& resources) const
{
- VLOG(1) << "Running " << path << " run -d --name=" << name << " "
- << image << " " << command;
+ CHECK(resources.size() != 0);
+
+ string cmd = " run -d";
+
+ // TODO(yifan): Support other resources (e.g. disk, ports).
+ Option<double> cpus = resources.cpus();
+ if (cpus.isSome()) {
+ uint64_t cpuShare =
+ std::max((uint64_t) (CPU_SHARES_PER_CPU * cpus.get()), MIN_CPU_SHARES);
+ cmd += " -c " + stringify(cpuShare);
+ }
+
+ Option<Bytes> mem = resources.mem();
+ if (mem.isSome()) {
+ Bytes memLimit = std::max(mem.get(), MIN_MEMORY);
+ cmd += " -m " + stringify(memLimit.bytes());
+ }
+
+ cmd += " --name=" + name + " " + image + " " + command;
+
+ VLOG(1) << "Running " << path << cmd;
Try<Subprocess> s = subprocess(
- path + " run -d --name=" + name + " " + image + " " + command,
+ path + cmd,
Subprocess::PIPE(),
Subprocess::PIPE(),
Subprocess::PIPE());
@@ -115,7 +137,6 @@ Future<Option<int> > Docker::run(
if (s.isError()) {
return Failure(s.error());
}
-
return s.get().status();
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/ee998e41/src/docker/docker.hpp
----------------------------------------------------------------------
diff --git a/src/docker/docker.hpp b/src/docker/docker.hpp
index 89840af..912859c 100644
--- a/src/docker/docker.hpp
+++ b/src/docker/docker.hpp
@@ -30,6 +30,9 @@
#include <stout/nothing.hpp>
#include <stout/option.hpp>
+#include "mesos/resources.hpp"
+
+
// Abstraction for working with Docker (modeled on CLI).
class Docker
{
@@ -63,7 +66,8 @@ public:
process::Future<Option<int> > run(
const std::string& image,
const std::string& command,
- const std::string& name) const;
+ const std::string& name,
+ const mesos::Resources& resources) const;
// Performs 'docker kill CONTAINER'.
process::Future<Option<int> > kill(
http://git-wip-us.apache.org/repos/asf/mesos/blob/ee998e41/src/slave/containerizer/docker.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/docker.cpp b/src/slave/containerizer/docker.cpp
index 38b2a03..7cd2d7d 100644
--- a/src/slave/containerizer/docker.cpp
+++ b/src/slave/containerizer/docker.cpp
@@ -136,7 +136,7 @@ private:
Future<ResourceStatistics> _usage(
const ContainerID& containerId,
- const Future<Docker::Container> container);
+ const Docker::Container& container);
// Call back for when the executor exits. This will trigger
// container destroy.
@@ -510,7 +510,7 @@ Future<bool> DockerContainerizerProcess::launch(
// Start a docker container then launch the executor (but destroy
// the Docker container if launching the executor failed).
- return docker.run(image, command.value(), name)
+ return docker.run(image, command.value(), name, taskInfo.resources())
.then(defer(self(),
&Self::_launch,
containerId,
@@ -634,33 +634,55 @@ Future<ResourceStatistics> DockerContainerizerProcess::usage(
{
#ifndef __linux__
return Failure("Does not support usage() on non-linux platform");
-#endif // __linux__
-
+#else
if (!promises.contains(containerId)) {
return Failure("Unknown container: " + stringify(containerId));
}
+ if (destroying.contains(containerId)) {
+ return Failure("Container is being removed: " + stringify(containerId));
+ }
+
// Construct the Docker container name.
string name = DOCKER_NAME_PREFIX + stringify(containerId);
return docker.inspect(name)
.then(defer(self(), &Self::_usage, containerId, lambda::_1));
+#endif // __linux__
}
Future<ResourceStatistics> DockerContainerizerProcess::_usage(
const ContainerID& containerId,
- const Future<Docker::Container> container)
+ const Docker::Container& container)
{
- Option<pid_t> pid = container.get().pid();
+ Option<pid_t> pid = container.pid();
if (pid.isNone()) {
return Failure("Container is not running");
}
- Try<ResourceStatistics> usage =
+
+ // Note that here getting the root pid is enough because
+ // the root process acts as an 'init' process in the docker
+ // container, so no other child processes will escape it.
+ Try<ResourceStatistics> statistics =
mesos::internal::usage(pid.get(), true, true);
- if (usage.isError()) {
- return Failure(usage.error());
+ if (statistics.isError()) {
+ return Failure(statistics.error());
+ }
+
+ ResourceStatistics result = statistics.get();
+
+ // Set the resource allocations.
+ Resources resource = resources[containerId];
+ Option<Bytes> mem = resource.mem();
+ if (mem.isSome()) {
+ result.set_mem_limit_bytes(mem.get().bytes());
+ }
+
+ Option<double> cpus = resource.cpus();
+ if (cpus.isSome()) {
+ result.set_cpus_limit(cpus.get());
}
- return usage.get();
+ return result;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/ee998e41/src/tests/docker_containerizer_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/docker_containerizer_tests.cpp b/src/tests/docker_containerizer_tests.cpp
index c0b915a..a6cae24 100644
--- a/src/tests/docker_containerizer_tests.cpp
+++ b/src/tests/docker_containerizer_tests.cpp
@@ -24,8 +24,10 @@
#include "tests/flags.hpp"
#include "tests/mesos.hpp"
-#include "slave/slave.hpp"
#include "slave/containerizer/docker.hpp"
+#include "slave/slave.hpp"
+#include "slave/state.hpp"
+
using namespace mesos;
using namespace mesos::internal;
@@ -148,8 +150,6 @@ TEST_F(DockerContainerizerTest, DOCKER_Launch)
task.mutable_command()->CopyFrom(command);
- Future<TaskStatus> statusRunning;
-
vector<TaskInfo> tasks;
tasks.push_back(task);
@@ -159,14 +159,14 @@ TEST_F(DockerContainerizerTest, DOCKER_Launch)
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
+ Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillRepeatedly(DoDefault());
driver.launchTasks(offers.get()[0].id(), tasks);
- AWAIT_READY(containerId);
-
+ AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
@@ -206,12 +206,13 @@ TEST_F(DockerContainerizerTest, DOCKER_Usage)
ASSERT_SOME(master);
slave::Flags flags = CreateSlaveFlags();
+ flags.resources = Option<string>("cpus:2;mem:1024");
Docker docker(tests::flags.docker);
MockDockerContainerizer dockerContainerizer(flags, true, docker);
- Try<PID<Slave> > slave = StartSlave(&dockerContainerizer);
+ Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
ASSERT_SOME(slave);
MockScheduler sched;
@@ -245,11 +246,11 @@ TEST_F(DockerContainerizerTest, DOCKER_Usage)
CommandInfo command;
CommandInfo::ContainerInfo* containerInfo = command.mutable_container();
containerInfo->set_image("docker://busybox");
- command.set_value("sleep 120");
- task.mutable_command()->CopyFrom(command);
+ // Run a CPU intensive command, so we can measure utime and stime later.
+ command.set_value("dd if=/dev/zero of=/dev/null");
- Future<TaskStatus> statusRunning;
+ task.mutable_command()->CopyFrom(command);
vector<TaskInfo> tasks;
tasks.push_back(task);
@@ -260,26 +261,51 @@ TEST_F(DockerContainerizerTest, DOCKER_Usage)
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
+ Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillRepeatedly(DoDefault());
driver.launchTasks(offers.get()[0].id(), tasks);
- AWAIT_READY(containerId);
-
+ AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
- Future<ResourceStatistics> usage =
- dockerContainerizer.usage(containerId.get());
- AWAIT_READY(usage);
- // TODO(yifan): Verify the usage.
+ // Verify the usage.
+ ResourceStatistics statistics;
+ Duration waited = Duration::zero();
+ do {
+ Future<ResourceStatistics> usage =
+ dockerContainerizer.usage(containerId.get());
+ AWAIT_READY(usage);
+
+ statistics = usage.get();
+
+ if (statistics.cpus_user_time_secs() > 0 &&
+ statistics.cpus_system_time_secs() > 0) {
+ break;
+ }
+
+ os::sleep(Milliseconds(200));
+ waited += Milliseconds(200);
+ } while (waited < Seconds(3));
+
+ EXPECT_EQ(2, statistics.cpus_limit());
+ EXPECT_EQ(Gigabytes(1).bytes(), statistics.mem_limit_bytes());
+ EXPECT_LT(0, statistics.cpus_user_time_secs());
+ EXPECT_LT(0, statistics.cpus_system_time_secs());
+
+ Future<containerizer::Termination> termination =
+ dockerContainerizer.wait(containerId.get());
dockerContainerizer.destroy(containerId.get());
- // Usage() should fail again since the container is destroyed.
- usage = dockerContainerizer.usage(containerId.get());
+ AWAIT_READY(termination);
+
+ // Usage() should fail again since the container is destroyed
+ Future<ResourceStatistics> usage =
+ dockerContainerizer.usage(containerId.get());
AWAIT_FAILED(usage);
driver.stop();
http://git-wip-us.apache.org/repos/asf/mesos/blob/ee998e41/src/tests/docker_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/docker_tests.cpp b/src/tests/docker_tests.cpp
index 62d5657..b7a7b6f 100644
--- a/src/tests/docker_tests.cpp
+++ b/src/tests/docker_tests.cpp
@@ -60,7 +60,7 @@ TEST(DockerTest, DOCKER_interface)
}
// Start the container.
- status = docker.run("busybox", "sleep 120", containerName);
+ status = docker.run("busybox", "sleep 120", containerName, resources);
AWAIT_READY(status);
ASSERT_SOME(status.get());
@@ -139,7 +139,7 @@ TEST(DockerTest, DOCKER_interface)
// directly, instead of killing and rm.
//
// First, Invoke docker.run()
- status = docker.run("busybox", "sleep 120", containerName);
+ status = docker.run("busybox", "sleep 120", containerName, resources);
AWAIT_READY(status);
ASSERT_SOME(status.get());