You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by tn...@apache.org on 2015/09/16 22:39:09 UTC
mesos git commit: Added health check support for docker command tasks.
Repository: mesos
Updated Branches:
refs/heads/master 482c20cc2 -> 5d6a06082
Added health check support for docker command tasks.
Review: https://reviews.apache.org/r/37505
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5d6a0608
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5d6a0608
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5d6a0608
Branch: refs/heads/master
Commit: 5d6a06082ce396582add2e3e8497e1f4cdf91ec1
Parents: 482c20c
Author: haosdent huang <ha...@gmail.com>
Authored: Wed Sep 16 17:54:26 2015 +0000
Committer: Timothy Chen <tn...@gmail.com>
Committed: Wed Sep 16 20:16:19 2015 +0000
----------------------------------------------------------------------
src/docker/docker.cpp | 42 ++++--
src/docker/docker.hpp | 8 ++
src/docker/executor.cpp | 165 +++++++++++++++++++++-
src/slave/containerizer/docker.cpp | 16 +--
src/tests/health_check_tests.cpp | 235 +++++++++++++++++++++++++++++++-
5 files changed, 432 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/5d6a0608/src/docker/docker.cpp
----------------------------------------------------------------------
diff --git a/src/docker/docker.cpp b/src/docker/docker.cpp
index 553e831..c4c37cb 100755
--- a/src/docker/docker.cpp
+++ b/src/docker/docker.cpp
@@ -122,22 +122,10 @@ Try<Docker*> Docker::create(
}
#endif // __linux__
- // Validate the version (and that we can use Docker at all).
- Future<Version> version = docker->version();
-
- if (!version.await(DOCKER_VERSION_WAIT_TIMEOUT)) {
+ Try<Nothing> validateVersion = docker->validateVersion(Version(1, 0, 0));
+ if (validateVersion.isError()) {
delete docker;
- return Error("Timed out getting docker version");
- }
-
- if (version.isFailed()) {
- delete docker;
- return Error("Failed to get docker version: " + version.failure());
- }
-
- if (version.get() < Version(1, 0, 0)) {
- delete docker;
- return Error("Insufficient version of Docker. Please upgrade to >= 1.0.0");
+ return Error(validateVersion.error());
}
return docker;
@@ -224,6 +212,30 @@ Future<Version> Docker::__version(const Future<string>& output)
}
+Try<Nothing> Docker::validateVersion(const Version& minVersion) const
+{
+ // Validate the version (and that we can use Docker at all).
+ Future<Version> version = this->version();
+
+ if (!version.await(DOCKER_VERSION_WAIT_TIMEOUT)) {
+ return Error("Timed out getting docker version");
+ }
+
+ if (version.isFailed()) {
+ return Error("Failed to get docker version: " + version.failure());
+ }
+
+ if (version.get() < minVersion) {
+ string msg = "Insufficient version '" + stringify(version.get()) +
+ "' of Docker. Please upgrade to >=' " +
+ stringify(minVersion) + "'";
+ return Error(msg);
+ }
+
+ return Nothing();
+}
+
+
Try<Docker::Container> Docker::Container::create(const string& output)
{
Try<JSON::Array> parse = JSON::parse<JSON::Array>(output);
http://git-wip-us.apache.org/repos/asf/mesos/blob/5d6a0608/src/docker/docker.hpp
----------------------------------------------------------------------
diff --git a/src/docker/docker.hpp b/src/docker/docker.hpp
index aaf8884..c04757c 100644
--- a/src/docker/docker.hpp
+++ b/src/docker/docker.hpp
@@ -152,6 +152,14 @@ public:
const std::string& image,
bool force = false) const;
+ // Validate current docker version is not less than minVersion.
+ virtual Try<Nothing> validateVersion(const Version& minVersion) const;
+
+ virtual std::string getPath()
+ {
+ return path;
+ }
+
protected:
// Uses the specified path to the Docker CLI tool.
Docker(const std::string& _path,
http://git-wip-us.apache.org/repos/asf/mesos/blob/5d6a0608/src/docker/executor.cpp
----------------------------------------------------------------------
diff --git a/src/docker/executor.cpp b/src/docker/executor.cpp
index 6647075..6809e4a 100644
--- a/src/docker/executor.cpp
+++ b/src/docker/executor.cpp
@@ -30,6 +30,7 @@
#include <process/owned.hpp>
#include <stout/flags.hpp>
+#include <stout/protobuf.hpp>
#include <stout/os.hpp>
#include "common/status_utils.hpp"
@@ -40,10 +41,13 @@
#include "logging/flags.hpp"
#include "logging/logging.hpp"
+#include "messages/messages.hpp"
+
using std::cerr;
using std::cout;
using std::endl;
using std::string;
+using std::vector;
namespace mesos {
namespace internal {
@@ -70,8 +74,12 @@ public:
const string& containerName,
const string& sandboxDirectory,
const string& mappedDirectory,
- const Duration& stopTimeout)
+ const Duration& stopTimeout,
+ const string& healthCheckDir)
: killed(false),
+ killedByHealthCheck(false),
+ healthPid(-1),
+ healthCheckDir(healthCheckDir),
docker(docker),
containerName(containerName),
sandboxDirectory(sandboxDirectory),
@@ -168,12 +176,19 @@ public:
return Nothing();
}));
+
+ inspect.onReady(
+ defer(self(), &Self::launchHealthCheck, containerName, task));
}
void killTask(ExecutorDriver* driver, const TaskID& taskId)
{
cout << "Killing docker task" << endl;
shutdown(driver);
+ if (healthPid != -1) {
+ // Cleanup health check process.
+ ::kill(healthPid, SIGKILL);
+ }
}
void frameworkMessage(ExecutorDriver* driver, const string& data) {}
@@ -196,6 +211,40 @@ public:
void error(ExecutorDriver* driver, const string& message) {}
+protected:
+ virtual void initialize()
+ {
+ install<TaskHealthStatus>(
+ &Self::taskHealthUpdated,
+ &TaskHealthStatus::task_id,
+ &TaskHealthStatus::healthy,
+ &TaskHealthStatus::kill_task);
+ }
+
+ void taskHealthUpdated(
+ const TaskID& taskID,
+ const bool& healthy,
+ const bool& initiateTaskKill)
+ {
+ if (driver.isNone()) {
+ return;
+ }
+
+ cout << "Received task health update, healthy: "
+ << stringify(healthy) << endl;
+
+ TaskStatus status;
+ status.mutable_task_id()->CopyFrom(taskID);
+ status.set_healthy(healthy);
+ status.set_state(TASK_RUNNING);
+ driver.get()->sendStatusUpdate(status);
+
+ if (initiateTaskKill) {
+ killedByHealthCheck = true;
+ killTask(driver.get(), taskID);
+ }
+ }
+
private:
void reaped(
ExecutorDriver* _driver,
@@ -233,6 +282,9 @@ private:
taskStatus.mutable_task_id()->CopyFrom(taskId);
taskStatus.set_state(state);
taskStatus.set_message(message);
+ if (killed && killedByHealthCheck) {
+ taskStatus.set_healthy(false);
+ }
driver.get()->sendStatusUpdate(taskStatus);
@@ -247,7 +299,104 @@ private:
}));
}
+ void launchHealthCheck(const string& containerName, const TaskInfo& task)
+ {
+ if (!killed && task.has_health_check()) {
+ HealthCheck healthCheck = task.health_check();
+
+ // Wrap the original health check command in "docker exec".
+ if (healthCheck.has_command()) {
+ CommandInfo command = healthCheck.command();
+
+ // "docker exec" require docker version greater than 1.3.0.
+ Try<Nothing> validateVersion =
+ docker->validateVersion(Version(1, 3, 0));
+ if (validateVersion.isError()) {
+ cerr << "Unable to launch health process: "
+ << validateVersion.error() << endl;
+ return;
+ }
+
+ vector<string> argv;
+ argv.push_back(docker->getPath());
+ argv.push_back("exec");
+ argv.push_back(containerName);
+
+ if (command.shell()) {
+ if (!command.has_value()) {
+ cerr << "Unable to launch health process: "
+ << "Shell command is not specified." << endl;
+ return;
+ }
+
+ argv.push_back("sh");
+ argv.push_back("-c");
+ argv.push_back("\"");
+ argv.push_back(command.value());
+ argv.push_back("\"");
+ } else {
+ if (!command.has_value()) {
+ cerr << "Unable to launch health process: "
+ << "Executable path is not specified." << endl;
+ return;
+ }
+
+ argv.push_back(command.value());
+ foreach (const string& argument, command.arguments()) {
+ argv.push_back(argument);
+ }
+ }
+
+ command.set_shell(true);
+ command.clear_arguments();
+ command.set_value(strings::join(" ", argv));
+ healthCheck.mutable_command()->CopyFrom(command);
+ } else {
+ cerr << "Unable to launch health process: "
+ << "Only command health check is supported now." << endl;
+ return;
+ }
+
+ JSON::Object json = JSON::Protobuf(healthCheck);
+
+ // Launch the subprocess using 'exec' style so that quotes can
+ // be properly handled.
+ vector<string> argv;
+ string path = path::join(healthCheckDir, "mesos-health-check");
+ argv.push_back(path);
+ argv.push_back("--executor=" + stringify(self()));
+ argv.push_back("--health_check_json=" + stringify(json));
+ argv.push_back("--task_id=" + task.task_id().value());
+
+ string cmd = strings::join(" ", argv);
+ cout << "Launching health check process: " << cmd << endl;
+
+ Try<Subprocess> healthProcess =
+ process::subprocess(
+ path,
+ argv,
+ // Intentionally not sending STDIN to avoid health check
+ // commands that expect STDIN input to block.
+ Subprocess::PATH("/dev/null"),
+ Subprocess::FD(STDOUT_FILENO),
+ Subprocess::FD(STDERR_FILENO));
+
+ if (healthProcess.isError()) {
+ cerr << "Unable to launch health process: "
+ << healthProcess.error() << endl;
+ } else {
+ healthPid = healthProcess.get().pid();
+
+ cout << "Health check process launched at pid: "
+ << stringify(healthPid) << endl;
+ }
+ }
+ }
+
bool killed;
+ bool killedByHealthCheck;
+ pid_t healthPid;
+ string healthCheckDir;
Owned<Docker> docker;
string containerName;
string sandboxDirectory;
@@ -268,14 +417,16 @@ public:
const string& container,
const string& sandboxDirectory,
const string& mappedDirectory,
- const Duration& stopTimeout)
+ const Duration& stopTimeout,
+ const string& healthCheckDir)
{
process = Owned<DockerExecutorProcess>(new DockerExecutorProcess(
docker,
container,
sandboxDirectory,
mappedDirectory,
- stopTimeout));
+ stopTimeout,
+ healthCheckDir));
spawn(process.get());
}
@@ -414,12 +565,18 @@ int main(int argc, char** argv)
return EXIT_FAILURE;
}
+ const Option<string> envPath = os::getenv("MESOS_LAUNCHER_DIR");
+ string path =
+ envPath.isSome() ? envPath.get()
+ : os::realpath(Path(argv[0]).dirname()).get();
+
mesos::internal::docker::DockerExecutor executor(
process::Owned<Docker>(docker.get()),
flags.container.get(),
flags.sandbox_directory.get(),
flags.mapped_directory.get(),
- flags.stop_timeout.get());
+ flags.stop_timeout.get(),
+ path);
mesos::MesosExecutorDriver driver(&executor);
return driver.run() == mesos::DRIVER_STOPPED ? EXIT_SUCCESS : EXIT_FAILURE;
http://git-wip-us.apache.org/repos/asf/mesos/blob/5d6a0608/src/slave/containerizer/docker.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/docker.cpp b/src/slave/containerizer/docker.cpp
index 289d4ec..efa3726 100644
--- a/src/slave/containerizer/docker.cpp
+++ b/src/slave/containerizer/docker.cpp
@@ -131,18 +131,10 @@ Try<DockerContainerizer*> DockerContainerizer::create(
Shared<Docker> docker(create.get());
if (flags.docker_mesos_image.isSome()) {
- Future<Version> version = docker->version();
- if (!version.await(DOCKER_VERSION_WAIT_TIMEOUT)) {
- return Error("Timed out waiting for docker version");
- }
-
- if (version.isFailed()) {
- return Error(version.failure());
- }
-
- if (version.get() < Version(1, 5, 0)) {
- string message = "Docker with mesos images requires docker 1.5+, found ";
- message += stringify(version.get());
+ Try<Nothing> validateResult = docker->validateVersion(Version(1, 5, 0));
+ if (validateResult.isError()) {
+ string message = "Docker with mesos images requires docker 1.5+";
+ message += validateResult.error();
return Error(message);
}
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/5d6a0608/src/tests/health_check_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/health_check_tests.cpp b/src/tests/health_check_tests.cpp
index 157a56a..ff6275b 100644
--- a/src/tests/health_check_tests.cpp
+++ b/src/tests/health_check_tests.cpp
@@ -21,10 +21,14 @@
#include <process/clock.hpp>
#include <process/future.hpp>
+#include <process/owned.hpp>
#include <process/pid.hpp>
+#include "docker/docker.hpp"
+
#include "slave/slave.hpp"
+#include "slave/containerizer/docker.hpp"
#include "slave/containerizer/fetcher.hpp"
#include "tests/containerizer.hpp"
@@ -35,6 +39,7 @@
using mesos::internal::master::Master;
using mesos::internal::slave::Containerizer;
+using mesos::internal::slave::DockerContainerizer;
using mesos::internal::slave::Fetcher;
using mesos::internal::slave::MesosContainerizer;
using mesos::internal::slave::MesosContainerizerProcess;
@@ -42,6 +47,7 @@ using mesos::internal::slave::Slave;
using process::Clock;
using process::Future;
+using process::Owned;
using process::PID;
using testing::_;
@@ -68,7 +74,8 @@ public:
const Offer& offer,
int gracePeriodSeconds = 0,
const Option<int>& consecutiveFailures = None(),
- const Option<map<string, string> >& env = None())
+ const Option<map<string, string>>& env = None(),
+ const Option<ContainerInfo>& containerInfo = None())
{
CommandInfo healthCommand;
healthCommand.set_value(healthCmd);
@@ -79,7 +86,8 @@ public:
offer,
gracePeriodSeconds,
consecutiveFailures,
- env);
+ env,
+ containerInfo);
}
vector<TaskInfo> populateTasks(
@@ -88,7 +96,8 @@ public:
const Offer& offer,
int gracePeriodSeconds = 0,
const Option<int>& consecutiveFailures = None(),
- const Option<map<string, string> >& env = None())
+ const Option<map<string, string>>& env = None(),
+ const Option<ContainerInfo>& containerInfo = None())
{
TaskInfo task;
task.set_name("");
@@ -109,6 +118,10 @@ public:
task.mutable_command()->CopyFrom(command);
+ if (containerInfo.isSome()) {
+ task.mutable_container()->CopyFrom(containerInfo.get());
+ }
+
HealthCheck healthCheck;
if (env.isSome()) {
@@ -238,6 +251,98 @@ TEST_F(HealthCheckTest, HealthyTask)
}
+// Testing a healthy task reporting one healthy status to scheduler for
+// docker executor.
+TEST_F(HealthCheckTest, ROOT_DOCKER_DockerHealthyTask)
+{
+ Owned<Docker> docker(Docker::create(tests::flags.docker,
+ tests::flags.docker_socket,
+ false).get());
+ Try<Nothing> validateResult = docker->validateVersion(Version(1, 3, 0));
+ ASSERT_SOME(validateResult)
+ << "-------------------------------------------------------------\n"
+ << "We cannot run this test because of 'docker exec' command \n"
+ << "require docker version greater than '1.3.0'. You won't be \n"
+ << "able to use the docker exec method, but feel free to disable\n"
+ << "this test.\n"
+ << "-------------------------------------------------------------";
+
+ Try<PID<Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ slave::Flags flags = CreateSlaveFlags();
+
+ Fetcher fetcher;
+
+ Try<DockerContainerizer*> containerizer =
+ DockerContainerizer::create(flags, &fetcher);
+ CHECK_SOME(containerizer);
+
+ Try<PID<Slave>> slave = StartSlave(containerizer.get());
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .Times(1);
+
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ ContainerInfo containerInfo;
+ containerInfo.set_type(ContainerInfo::DOCKER);
+
+ // TODO(tnachen): Use local image to test if possible.
+ ContainerInfo::DockerInfo dockerInfo;
+ dockerInfo.set_image("busybox");
+ containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+ vector<TaskInfo> tasks = populateTasks(
+ "sleep 120", "exit 0", offers.get()[0], 0, None(), None(), containerInfo);
+
+ Future<TaskStatus> statusRunning;
+ Future<TaskStatus> statusHealth;
+
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusRunning))
+ .WillOnce(FutureArg<1>(&statusHealth));
+
+ driver.launchTasks(offers.get()[0].id(), tasks);
+
+ AWAIT_READY(statusRunning);
+ EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+
+ AWAIT_READY(statusHealth);
+ EXPECT_EQ(TASK_RUNNING, statusHealth.get().state());
+ EXPECT_TRUE(statusHealth.get().has_healthy());
+ EXPECT_TRUE(statusHealth.get().healthy());
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+
+ Future<std::list<Docker::Container>> containers =
+ docker->ps(true, slave::DOCKER_NAME_PREFIX);
+
+ AWAIT_READY(containers);
+
+ // Cleanup all mesos launched containers.
+ foreach (const Docker::Container& container, containers.get()) {
+ AWAIT_READY_FOR(docker->rm(container.id, true), Seconds(30));
+ }
+}
+
+
// Same as above, but use the non-shell version of the health command.
TEST_F(HealthCheckTest, HealthyTaskNonShell)
{
@@ -395,6 +500,130 @@ TEST_F(HealthCheckTest, HealthStatusChange)
}
+// Testing health status change reporting to scheduler for docker executor.
+TEST_F(HealthCheckTest, ROOT_DOCKER_DockerHealthStatusChange)
+{
+ Owned<Docker> docker(Docker::create(tests::flags.docker,
+ tests::flags.docker_socket,
+ false).get());
+ Try<Nothing> validateResult = docker->validateVersion(Version(1, 3, 0));
+ ASSERT_SOME(validateResult)
+ << "-------------------------------------------------------------\n"
+ << "We cannot run this test because of 'docker exec' command \n"
+ << "require docker version greater than '1.3.0'. You won't be \n"
+ << "able to use the docker exec method, but feel free to disable\n"
+ << "this test.\n"
+ << "-------------------------------------------------------------";
+
+ Try<PID<Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ slave::Flags flags = CreateSlaveFlags();
+
+ Fetcher fetcher;
+
+ Try<DockerContainerizer*> containerizer =
+ DockerContainerizer::create(flags, &fetcher);
+ CHECK_SOME(containerizer);
+
+ Try<PID<Slave>> slave = StartSlave(containerizer.get());
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ ContainerInfo containerInfo;
+ containerInfo.set_type(ContainerInfo::DOCKER);
+
+ // TODO(tnachen): Use local image to test if possible.
+ ContainerInfo::DockerInfo dockerInfo;
+ dockerInfo.set_image("busybox");
+ containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+ // Create a temporary file in host and then we could this file to make sure
+ // the health check command is run in docker container.
+ string tmpPath = path::join(os::getcwd(), "foobar");
+ ASSERT_SOME(os::write(tmpPath, "bar"));
+
+ // This command fails every other invocation.
+ // For all runs i in Nat0, the following case i % 2 applies:
+ //
+ // Case 0:
+ // - Attempt to remove the nonexistent temporary file.
+ // - Create the temporary file.
+ // - Exit with a non-zero status.
+ //
+ // Case 1:
+ // - Remove the temporary file.
+ string alt = "rm " + tmpPath + " || (mkdir -p " + os::getcwd() +
+ " && echo foo >" + tmpPath + " && exit 1)";
+
+ vector<TaskInfo> tasks = populateTasks(
+ "sleep 120", alt, offers.get()[0], 0, 3, None(), containerInfo);
+
+ Future<TaskStatus> statusRunning;
+ Future<TaskStatus> statusHealth1;
+ Future<TaskStatus> statusHealth2;
+ Future<TaskStatus> statusHealth3;
+
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusRunning))
+ .WillOnce(FutureArg<1>(&statusHealth1))
+ .WillOnce(FutureArg<1>(&statusHealth2))
+ .WillOnce(FutureArg<1>(&statusHealth3));
+
+ driver.launchTasks(offers.get()[0].id(), tasks);
+
+ AWAIT_READY(statusRunning);
+ EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+
+ AWAIT_READY(statusHealth1);
+ EXPECT_EQ(TASK_RUNNING, statusHealth1.get().state());
+ EXPECT_FALSE(statusHealth1.get().healthy());
+
+ AWAIT_READY(statusHealth2);
+ EXPECT_EQ(TASK_RUNNING, statusHealth2.get().state());
+ EXPECT_TRUE(statusHealth2.get().healthy());
+
+ AWAIT_READY(statusHealth3);
+ EXPECT_EQ(TASK_RUNNING, statusHealth3.get().state());
+ EXPECT_FALSE(statusHealth3.get().healthy());
+
+ // Check the temporary file created in host still exists and the content
+ // don't change.
+ ASSERT_SOME(os::read(tmpPath));
+ EXPECT_EQ("bar", os::read(tmpPath).get());
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+
+ Future<std::list<Docker::Container>> containers =
+ docker->ps(true, slave::DOCKER_NAME_PREFIX);
+
+ AWAIT_READY(containers);
+
+ // Cleanup all mesos launched containers.
+ foreach (const Docker::Container& container, containers.get()) {
+ AWAIT_READY_FOR(docker->rm(container.id, true), Seconds(30));
+ }
+}
+
+
// Testing killing task after number of consecutive failures.
// Temporarily disabled due to MESOS-1613.
TEST_F(HealthCheckTest, DISABLED_ConsecutiveFailures)